diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d266b0ed2..3ce378ed33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * [CHANGE] Ingester: Disable uploading compacted blocks and overlapping compaction in ingester. #5735 * [CHANGE] Distributor: Count the number of rate-limited samples in `distributor_samples_in_total`. #5714 * [CHANGE] Ruler: Remove `cortex_ruler_write_requests_total`, `cortex_ruler_write_requests_failed_total`, `cortex_ruler_queries_total`, `cortex_ruler_queries_failed_total`, and `cortex_ruler_query_seconds_total` metrics for the tenant when the ruler deletes the manager for the tenant. #5772 +* [CHANGE] Querier: Mark `-querier.ingester-streaming` flag as deprecated. Now query ingester streaming is always enabled. #5817 * [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477 * [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605 * [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 7e72d635c8..cc9b5f0479 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -114,10 +114,6 @@ querier: # CLI flag: -querier.batch-iterators [batch_iterators: | default = true] - # Use streaming RPCs to query ingester. - # CLI flag: -querier.ingester-streaming - [ingester_streaming: | default = true] - # Use streaming RPCs for metadata APIs from ingester. # CLI flag: -querier.ingester-metadata-streaming [ingester_metadata_streaming: | default = false] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 89e9a26703..e0597f15b2 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3596,10 +3596,6 @@ The `querier_config` configures the Cortex querier. # CLI flag: -querier.batch-iterators [batch_iterators: | default = true] -# Use streaming RPCs to query ingester. -# CLI flag: -querier.ingester-streaming -[ingester_streaming: | default = true] - # Use streaming RPCs for metadata APIs from ingester. # CLI flag: -querier.ingester-metadata-streaming [ingester_metadata_streaming: | default = false] diff --git a/integration/querier_test.go b/integration/querier_test.go index be843b5db0..f83d80a781 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -27,87 +27,70 @@ import ( func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { tests := map[string]struct { - blocksShardingStrategy string // Empty means sharding is disabled. - tenantShardSize int - ingesterStreamingEnabled bool - indexCacheBackend string - chunkCacheBackend string - bucketIndexEnabled bool + blocksShardingStrategy string // Empty means sharding is disabled. + tenantShardSize int + indexCacheBackend string + chunkCacheBackend string + bucketIndexEnabled bool }{ - "blocks sharding disabled, ingester gRPC streaming disabled, memcached index cache": { - blocksShardingStrategy: "", - ingesterStreamingEnabled: false, - indexCacheBackend: tsdb.IndexCacheBackendMemcached, - chunkCacheBackend: tsdb.CacheBackendMemcached, + "blocks sharding disabled, memcached index cache": { + blocksShardingStrategy: "", + indexCacheBackend: tsdb.IndexCacheBackendMemcached, + chunkCacheBackend: tsdb.CacheBackendMemcached, }, - "blocks sharding disabled, ingester gRPC streaming disabled, multilevel index cache (inmemory, memcached)": { - blocksShardingStrategy: "", - ingesterStreamingEnabled: false, - indexCacheBackend: fmt.Sprintf("%v,%v", tsdb.IndexCacheBackendInMemory, tsdb.IndexCacheBackendMemcached), - chunkCacheBackend: tsdb.CacheBackendMemcached, + "blocks sharding disabled, multilevel index cache (inmemory, memcached)": { + blocksShardingStrategy: "", + indexCacheBackend: fmt.Sprintf("%v,%v", tsdb.IndexCacheBackendInMemory, tsdb.IndexCacheBackendMemcached), + chunkCacheBackend: tsdb.CacheBackendMemcached, }, - "blocks sharding disabled, ingester gRPC streaming disabled, redis index cache": { - blocksShardingStrategy: "", - ingesterStreamingEnabled: false, - indexCacheBackend: tsdb.IndexCacheBackendRedis, - chunkCacheBackend: tsdb.CacheBackendRedis, + "blocks sharding disabled, redis index cache": { + blocksShardingStrategy: "", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendRedis, }, - "blocks sharding disabled, ingester gRPC streaming disabled, multilevel index cache (inmemory, redis)": { - blocksShardingStrategy: "", - ingesterStreamingEnabled: false, - indexCacheBackend: fmt.Sprintf("%v,%v", tsdb.IndexCacheBackendInMemory, tsdb.IndexCacheBackendRedis), - chunkCacheBackend: tsdb.CacheBackendRedis, + "blocks sharding disabled, multilevel index cache (inmemory, redis)": { + blocksShardingStrategy: "", + indexCacheBackend: fmt.Sprintf("%v,%v", tsdb.IndexCacheBackendInMemory, tsdb.IndexCacheBackendRedis), + chunkCacheBackend: tsdb.CacheBackendRedis, }, - "blocks default sharding, ingester gRPC streaming disabled, inmemory index cache": { - blocksShardingStrategy: "default", - ingesterStreamingEnabled: false, - indexCacheBackend: tsdb.IndexCacheBackendInMemory, + "blocks default sharding, inmemory index cache": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendInMemory, }, - "blocks default sharding, ingester gRPC streaming enabled, inmemory index cache": { - blocksShardingStrategy: "default", - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendInMemory, + "blocks default sharding, memcached index cache": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendMemcached, + chunkCacheBackend: tsdb.CacheBackendMemcached, }, - "blocks default sharding, ingester gRPC streaming enabled, memcached index cache": { - blocksShardingStrategy: "default", - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendMemcached, - chunkCacheBackend: tsdb.CacheBackendMemcached, + "blocks shuffle sharding, memcached index cache": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + indexCacheBackend: tsdb.IndexCacheBackendMemcached, + chunkCacheBackend: tsdb.CacheBackendMemcached, }, - "blocks shuffle sharding, ingester gRPC streaming enabled, memcached index cache": { - blocksShardingStrategy: "shuffle-sharding", - tenantShardSize: 1, - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendMemcached, - chunkCacheBackend: tsdb.CacheBackendMemcached, + "blocks default sharding, inmemory index cache, bucket index enabled": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendInMemory, + bucketIndexEnabled: true, }, - "blocks default sharding, ingester gRPC streaming enabled, inmemory index cache, bucket index enabled": { - blocksShardingStrategy: "default", - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendInMemory, - bucketIndexEnabled: true, + "blocks shuffle sharding, memcached index cache, bucket index enabled": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + indexCacheBackend: tsdb.IndexCacheBackendInMemory, + bucketIndexEnabled: true, }, - "blocks shuffle sharding, ingester gRPC streaming enabled, memcached index cache, bucket index enabled": { - blocksShardingStrategy: "shuffle-sharding", - tenantShardSize: 1, - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendInMemory, - bucketIndexEnabled: true, + "blocks default sharding, redis index cache, bucket index enabled": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendRedis, + bucketIndexEnabled: true, }, - "blocks default sharding, ingester gRPC streaming enabled, redis index cache, bucket index enabled": { - blocksShardingStrategy: "default", - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendRedis, - chunkCacheBackend: tsdb.CacheBackendRedis, - bucketIndexEnabled: true, - }, - "blocks shuffle sharding, ingester gRPC streaming enabled, redis index cache, bucket index enabled": { - blocksShardingStrategy: "shuffle-sharding", - tenantShardSize: 1, - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendRedis, - chunkCacheBackend: tsdb.CacheBackendRedis, - bucketIndexEnabled: true, + "blocks shuffle sharding, redis index cache, bucket index enabled": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendRedis, + bucketIndexEnabled: true, }, } @@ -134,7 +117,6 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { "-store-gateway.sharding-enabled": strconv.FormatBool(testCfg.blocksShardingStrategy != ""), "-store-gateway.sharding-strategy": testCfg.blocksShardingStrategy, "-store-gateway.tenant-shard-size": fmt.Sprintf("%d", testCfg.tenantShardSize), - "-querier.ingester-streaming": strconv.FormatBool(testCfg.ingesterStreamingEnabled), "-querier.query-store-for-labels-enabled": "true", "-querier.thanos-engine": strconv.FormatBool(thanosEngine), "-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled), @@ -319,52 +301,39 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) { tests := map[string]struct { - blocksShardingEnabled bool - ingesterStreamingEnabled bool - indexCacheBackend string - bucketIndexEnabled bool + blocksShardingEnabled bool + indexCacheBackend string + bucketIndexEnabled bool }{ - "blocks sharding enabled, ingester gRPC streaming disabled, inmemory index cache": { - blocksShardingEnabled: true, - ingesterStreamingEnabled: false, - indexCacheBackend: tsdb.IndexCacheBackendInMemory, - }, - "blocks sharding enabled, ingester gRPC streaming enabled, inmemory index cache": { - blocksShardingEnabled: true, - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendInMemory, + "blocks sharding enabled, inmemory index cache": { + blocksShardingEnabled: true, + indexCacheBackend: tsdb.IndexCacheBackendInMemory, }, - "blocks sharding disabled, ingester gRPC streaming disabled, memcached index cache": { - blocksShardingEnabled: false, - ingesterStreamingEnabled: false, - indexCacheBackend: tsdb.IndexCacheBackendMemcached, + "blocks sharding disabled, memcached index cache": { + blocksShardingEnabled: false, + indexCacheBackend: tsdb.IndexCacheBackendMemcached, }, - "blocks sharding enabled, ingester gRPC streaming enabled, memcached index cache": { - blocksShardingEnabled: true, - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendMemcached, + "blocks sharding enabled, memcached index cache": { + blocksShardingEnabled: true, + indexCacheBackend: tsdb.IndexCacheBackendMemcached, }, - "blocks sharding enabled, ingester gRPC streaming enabled, memcached index cache, bucket index enabled": { - blocksShardingEnabled: true, - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendMemcached, - bucketIndexEnabled: true, + "blocks sharding enabled, memcached index cache, bucket index enabled": { + blocksShardingEnabled: true, + indexCacheBackend: tsdb.IndexCacheBackendMemcached, + bucketIndexEnabled: true, }, - "blocks sharding disabled, ingester gRPC streaming disabled, redis index cache": { - blocksShardingEnabled: false, - ingesterStreamingEnabled: false, - indexCacheBackend: tsdb.IndexCacheBackendRedis, + "blocks sharding disabled,redis index cache": { + blocksShardingEnabled: false, + indexCacheBackend: tsdb.IndexCacheBackendRedis, }, - "blocks sharding enabled, ingester gRPC streaming enabled, redis index cache": { - blocksShardingEnabled: true, - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendRedis, + "blocks sharding enabled, redis index cache": { + blocksShardingEnabled: true, + indexCacheBackend: tsdb.IndexCacheBackendRedis, }, - "blocks sharding enabled, ingester gRPC streaming enabled, redis index cache, bucket index enabled": { - blocksShardingEnabled: true, - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendRedis, - bucketIndexEnabled: true, + "blocks sharding enabled, redis index cache, bucket index enabled": { + blocksShardingEnabled: true, + indexCacheBackend: tsdb.IndexCacheBackendRedis, + bucketIndexEnabled: true, }, } @@ -398,7 +367,6 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) { "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), "-blocks-storage.bucket-store.index-cache.backend": testCfg.indexCacheBackend, "-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled), - "-querier.ingester-streaming": strconv.FormatBool(testCfg.ingesterStreamingEnabled), "-querier.query-store-for-labels-enabled": "true", "-querier.thanos-engine": strconv.FormatBool(thanosEngine), // Ingester. @@ -1041,7 +1009,6 @@ func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { "-blocks-storage.tsdb.ship-interval": "1s", "-blocks-storage.bucket-store.sync-interval": "1s", "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), - "-querier.ingester-streaming": "true", "-querier.query-store-for-labels-enabled": "true", "-querier.max-fetched-series-per-query": "3", }) diff --git a/pkg/chunk/chunk_test.go b/pkg/chunk/chunk_test.go deleted file mode 100644 index 7030a76cdc..0000000000 --- a/pkg/chunk/chunk_test.go +++ /dev/null @@ -1,105 +0,0 @@ -package chunk - -import ( - "context" - "sort" - "testing" - "time" - - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - "github.com/stretchr/testify/require" - - "github.com/cortexproject/cortex/pkg/chunk/encoding" - "github.com/cortexproject/cortex/pkg/util" -) - -func init() { - encoding.DefaultEncoding = encoding.PrometheusXorChunk -} - -var labelsForDummyChunks = labels.Labels{ - {Name: labels.MetricName, Value: "foo"}, - {Name: "bar", Value: "baz"}, - {Name: "toms", Value: "code"}, -} - -func dummyChunkFor(now model.Time, metric labels.Labels) Chunk { - c, _ := encoding.NewForEncoding(encoding.DefaultEncoding) - chunkStart := now.Add(-time.Hour) - - t := 15 * time.Second - nc, err := c.Add(model.SamplePair{Timestamp: chunkStart.Add(t), Value: model.SampleValue(1)}) - if err != nil { - panic(err) - } - if nc != nil { - panic("returned chunk was not nil") - } - - chunk := NewChunk( - metric, - c, - chunkStart, - now, - ) - return chunk -} - -func TestChunksToMatrix(t *testing.T) { - // Create 2 chunks which have the same metric - now := model.Now() - chunk1 := dummyChunkFor(now, labelsForDummyChunks) - chunk1Samples, err := chunk1.Samples(chunk1.From, chunk1.Through) - require.NoError(t, err) - chunk2 := dummyChunkFor(now, labelsForDummyChunks) - chunk2Samples, err := chunk2.Samples(chunk2.From, chunk2.Through) - require.NoError(t, err) - - ss1 := &model.SampleStream{ - Metric: util.LabelsToMetric(chunk1.Metric), - Values: util.MergeSampleSets(chunk1Samples, chunk2Samples), - } - - // Create another chunk with a different metric - otherMetric := labels.Labels{ - {Name: model.MetricNameLabel, Value: "foo2"}, - {Name: "bar", Value: "baz"}, - {Name: "toms", Value: "code"}, - } - chunk3 := dummyChunkFor(now, otherMetric) - chunk3Samples, err := chunk3.Samples(chunk3.From, chunk3.Through) - require.NoError(t, err) - - ss2 := &model.SampleStream{ - Metric: util.LabelsToMetric(chunk3.Metric), - Values: chunk3Samples, - } - - for _, c := range []struct { - chunks []Chunk - expectedMatrix model.Matrix - }{ - { - chunks: []Chunk{}, - expectedMatrix: model.Matrix{}, - }, { - chunks: []Chunk{ - chunk1, - chunk2, - chunk3, - }, - expectedMatrix: model.Matrix{ - ss1, - ss2, - }, - }, - } { - matrix, err := ChunksToMatrix(context.Background(), c.chunks, chunk1.From, chunk3.Through) - require.NoError(t, err) - - sort.Sort(matrix) - sort.Sort(c.expectedMatrix) - require.Equal(t, c.expectedMatrix, matrix) - } -} diff --git a/pkg/chunk/fixtures.go b/pkg/chunk/fixtures.go index 576040328b..9227415db0 100644 --- a/pkg/chunk/fixtures.go +++ b/pkg/chunk/fixtures.go @@ -3,12 +3,8 @@ package chunk // Chunk functions used only in tests import ( - "context" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" - - "github.com/cortexproject/cortex/pkg/util" ) // BenchmarkLabels is a real example from Kubernetes' embedded cAdvisor metrics, lightly obfuscated @@ -31,31 +27,3 @@ var BenchmarkLabels = labels.Labels{ {Name: "namespace", Value: "kube-system"}, {Name: "pod_name", Value: "some-other-name-5j8s8"}, } - -// ChunksToMatrix converts a set of chunks to a model.Matrix. -func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) { - // Group chunks by series, sort and dedupe samples. - metrics := map[model.Fingerprint]model.Metric{} - samplesBySeries := map[model.Fingerprint][][]model.SamplePair{} - for _, c := range chunks { - ss, err := c.Samples(from, through) - if err != nil { - return nil, err - } - - metric := util.LabelsToMetric(c.Metric) - fingerprint := metric.Fingerprint() - metrics[fingerprint] = metric - samplesBySeries[fingerprint] = append(samplesBySeries[fingerprint], ss) - } - - matrix := make(model.Matrix, 0, len(samplesBySeries)) - for fp, ss := range samplesBySeries { - matrix = append(matrix, &model.SampleStream{ - Metric: metrics[fp], - Values: util.MergeNSampleSets(ss...), - }) - } - - return matrix, nil -} diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 8b4e933daf..00e9f835d1 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1019,11 +1019,7 @@ func TestDistributor_PushQuery(t *testing.T) { assert.Equal(t, &cortexpb.WriteResponse{}, writeResponse) assert.Nil(t, err) - response, err := ds[0].Query(ctx, 0, 10, tc.matchers...) - sort.Sort(response) - assert.Equal(t, tc.expectedResponse, response) - assert.Equal(t, tc.expectedError, err) - + var response model.Matrix series, err := ds[0].QueryStream(ctx, 0, 10, tc.matchers...) assert.Equal(t, tc.expectedError, err) @@ -1040,7 +1036,6 @@ func TestDistributor_PushQuery(t *testing.T) { // if all other ones are successful, so we're good either has been queried X or X-1 // ingesters. if tc.expectedError == nil { - assert.Contains(t, []int{tc.expectedIngesters, tc.expectedIngesters - 1}, countMockIngestersCalls(ingesters, "Query")) assert.Contains(t, []int{tc.expectedIngesters, tc.expectedIngesters - 1}, countMockIngestersCalls(ingesters, "QueryStream")) } }) @@ -2038,10 +2033,7 @@ func TestSlowQueries(t *testing.T) { shardByAllLabels: shardByAllLabels, }) - _, err := ds[0].Query(ctx, 0, 10, nameMatcher) - assert.Equal(t, expectedErr, err) - - _, err = ds[0].QueryStream(ctx, 0, 10, nameMatcher) + _, err := ds[0].QueryStream(ctx, 0, 10, nameMatcher) assert.Equal(t, expectedErr, err) }) } diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 3048b1c471..dd4180f36a 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -23,33 +23,6 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -// Query multiple ingesters and returns a Matrix of samples. -func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { - var matrix model.Matrix - err := instrument.CollectedRequest(ctx, "Distributor.Query", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error { - req, err := ingester_client.ToQueryRequest(from, to, matchers) - if err != nil { - return err - } - - replicationSet, err := d.GetIngestersForQuery(ctx, matchers...) - if err != nil { - return err - } - - matrix, err = d.queryIngesters(ctx, replicationSet, req) - if err != nil { - return err - } - - if s := opentracing.SpanFromContext(ctx); s != nil { - s.LogKV("series", len(matrix)) - } - return nil - }) - return matrix, err -} - func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*ingester_client.ExemplarQueryResponse, error) { var result *ingester_client.ExemplarQueryResponse err := instrument.CollectedRequest(ctx, "Distributor.QueryExemplars", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error { @@ -157,52 +130,6 @@ func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.Replica return d.ingestersRing.GetReplicationSetForOperation(ring.Read) } -// queryIngesters queries the ingesters via the older, sample-based API. -func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (model.Matrix, error) { - // Fetch samples from multiple ingesters in parallel, using the replicationSet - // to deal with consistency. - results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { - client, err := d.ingesterPool.GetClientFor(ing.Addr) - if err != nil { - return nil, err - } - - resp, err := client.(ingester_client.IngesterClient).Query(ctx, req) - d.ingesterQueries.WithLabelValues(ing.Addr).Inc() - if err != nil { - d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc() - return nil, err - } - - return ingester_client.FromQueryResponse(resp), nil - }) - if err != nil { - return nil, err - } - - // Merge the results into a single matrix. - fpToSampleStream := map[model.Fingerprint]*model.SampleStream{} - for _, result := range results { - for _, ss := range result.(model.Matrix) { - fp := ss.Metric.Fingerprint() - mss, ok := fpToSampleStream[fp] - if !ok { - mss = &model.SampleStream{ - Metric: ss.Metric, - } - fpToSampleStream[fp] = mss - } - mss.Values = util.MergeSampleSets(mss.Values, ss.Values) - } - } - result := model.Matrix{} - for _, ss := range fpToSampleStream { - result = append(result, ss) - } - - return result, nil -} - // mergeExemplarSets merges and dedupes two sets of already sorted exemplar pairs. // Both a and b should be lists of exemplars from the same series. // Defined here instead of pkg/util to avoid a import cycle. diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 10e4db4326..d78af05879 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -26,7 +26,6 @@ import ( // Distributor is the read interface to the distributor, made an interface here // to reduce package coupling. type Distributor interface { - Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*client.ExemplarQueryResponse, error) LabelValuesForLabelName(ctx context.Context, from, to model.Time, label model.LabelName, matchers ...*labels.Matcher) ([]string, error) @@ -38,10 +37,9 @@ type Distributor interface { MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) } -func newDistributorQueryable(distributor Distributor, streaming bool, streamingMetdata bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, queryStoreForLabels bool) QueryableWithFilter { +func newDistributorQueryable(distributor Distributor, streamingMetdata bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, queryStoreForLabels bool) QueryableWithFilter { return distributorQueryable{ distributor: distributor, - streaming: streaming, streamingMetdata: streamingMetdata, iteratorFn: iteratorFn, queryIngestersWithin: queryIngestersWithin, @@ -51,7 +49,6 @@ func newDistributorQueryable(distributor Distributor, streaming bool, streamingM type distributorQueryable struct { distributor Distributor - streaming bool streamingMetdata bool iteratorFn chunkIteratorFunc queryIngestersWithin time.Duration @@ -63,7 +60,6 @@ func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error) distributor: d.distributor, mint: mint, maxt: maxt, - streaming: d.streaming, streamingMetadata: d.streamingMetdata, chunkIterFn: d.iteratorFn, queryIngestersWithin: d.queryIngestersWithin, @@ -79,7 +75,6 @@ func (d distributorQueryable) UseQueryable(now time.Time, _, queryMaxT int64) bo type distributorQuerier struct { distributor Distributor mint, maxt int64 - streaming bool streamingMetadata bool chunkIterFn chunkIteratorFunc queryIngestersWithin time.Duration @@ -144,17 +139,7 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st return series.MetricsToSeriesSet(sortSeries, ms) } - if q.streaming { - return q.streamingSelect(ctx, sortSeries, minT, maxT, matchers) - } - - matrix, err := q.distributor.Query(ctx, model.Time(minT), model.Time(maxT), matchers...) - if err != nil { - return storage.ErrSeriesSet(err) - } - - // Using MatrixToSeriesSet (and in turn NewConcreteSeriesSet), sorts the series. - return series.MatrixToSeriesSet(sortSeries, matrix) + return q.streamingSelect(ctx, sortSeries, minT, maxT, matchers) } func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries bool, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet { diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 739c37f760..2d9704b1ce 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -29,46 +29,6 @@ const ( mint, maxt = 0, 10 ) -func TestDistributorQuerier(t *testing.T) { - t.Parallel() - - d := &MockDistributor{} - d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( - model.Matrix{ - // Matrixes are unsorted, so this tests that the labels get sorted. - &model.SampleStream{ - Metric: model.Metric{ - "foo": "bar", - }, - }, - &model.SampleStream{ - Metric: model.Metric{ - "bar": "baz", - }, - }, - }, - nil) - - queryable := newDistributorQueryable(d, false, false, nil, 0, false) - querier, err := queryable.Querier(mint, maxt) - require.NoError(t, err) - - ctx := context.Background() - seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: mint, End: maxt}) - require.NoError(t, seriesSet.Err()) - - require.True(t, seriesSet.Next()) - series := seriesSet.At() - require.Equal(t, labels.Labels{{Name: "bar", Value: "baz"}}, series.Labels()) - - require.True(t, seriesSet.Next()) - series = seriesSet.At() - require.Equal(t, labels.Labels{{Name: "foo", Value: "bar"}}, series.Labels()) - - require.False(t, seriesSet.Next()) - require.NoError(t, seriesSet.Err()) -} - func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) { now := time.Now() @@ -129,20 +89,19 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) }, } - for _, streamingEnabled := range []bool{false, true} { + for _, streamingMetadataEnabled := range []bool{false, true} { for testName, testData := range tests { testData := testData - t.Run(fmt.Sprintf("%s (streaming enabled: %t)", testName, streamingEnabled), func(t *testing.T) { + t.Run(fmt.Sprintf("%s (streaming metadata enabled: %t)", testName, streamingMetadataEnabled), func(t *testing.T) { t.Parallel() distributor := &MockDistributor{} - distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil) ctx := user.InjectOrgID(context.Background(), "test") - queryable := newDistributorQueryable(distributor, streamingEnabled, streamingEnabled, nil, testData.queryIngestersWithin, testData.queryStoreForLabels) + queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, nil, testData.queryIngestersWithin, testData.queryStoreForLabels) querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT) require.NoError(t, err) @@ -181,7 +140,7 @@ func TestDistributorQueryableFilter(t *testing.T) { t.Parallel() d := &MockDistributor{} - dq := newDistributorQueryable(d, false, false, nil, 1*time.Hour, true) + dq := newDistributorQueryable(d, false, nil, 1*time.Hour, true) now := time.Now() @@ -231,7 +190,7 @@ func TestIngesterStreaming(t *testing.T) { nil) ctx := user.InjectOrgID(context.Background(), "0") - queryable := newDistributorQueryable(d, true, true, mergeChunks, 0, true) + queryable := newDistributorQueryable(d, true, mergeChunks, 0, true) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -309,7 +268,7 @@ func TestIngesterStreamingMixedResults(t *testing.T) { nil) ctx := user.InjectOrgID(context.Background(), "0") - queryable := newDistributorQueryable(d, true, true, mergeChunks, 0, true) + queryable := newDistributorQueryable(d, true, mergeChunks, 0, true) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -365,7 +324,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) { d.On("MetricsForLabelMatchersStream", mock.Anything, model.Time(mint), model.Time(maxt), someMatchers). Return(metrics, nil) - queryable := newDistributorQueryable(d, false, streamingEnabled, nil, 0, true) + queryable := newDistributorQueryable(d, streamingEnabled, nil, 0, true) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index d453330b7a..5938b976e6 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -46,7 +46,7 @@ type Config struct { Timeout time.Duration `yaml:"timeout"` Iterators bool `yaml:"iterators"` BatchIterators bool `yaml:"batch_iterators"` - IngesterStreaming bool `yaml:"ingester_streaming"` + IngesterStreaming bool `yaml:"ingester_streaming" doc:"hidden"` IngesterMetadataStreaming bool `yaml:"ingester_metadata_streaming"` MaxSamples int `yaml:"max_samples"` QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"` @@ -100,13 +100,14 @@ var ( func (cfg *Config) RegisterFlags(f *flag.FlagSet) { //lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods flagext.DeprecatedFlag(f, "querier.at-modifier-enabled", "This flag is no longer functional; at-modifier is always enabled now.", util_log.Logger) + //lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods + flagext.DeprecatedFlag(f, "querier.ingester-streaming", "Deprecated: Use streaming RPCs to query ingester. QueryStream is always enabled and the flag is not effective anymore.", util_log.Logger) cfg.StoreGatewayClient.RegisterFlagsWithPrefix("querier.store-gateway-client", f) f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.") f.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.") f.BoolVar(&cfg.Iterators, "querier.iterators", false, "Use iterators to execute query, as opposed to fully materialising the series in memory.") f.BoolVar(&cfg.BatchIterators, "querier.batch-iterators", true, "Use batch iterators to execute query, as opposed to fully materialising the series in memory. Takes precedent over the -querier.iterators flag.") - f.BoolVar(&cfg.IngesterStreaming, "querier.ingester-streaming", true, "Use streaming RPCs to query ingester.") f.BoolVar(&cfg.IngesterMetadataStreaming, "querier.ingester-metadata-streaming", false, "Use streaming RPCs for metadata APIs from ingester.") f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.") f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") @@ -164,7 +165,7 @@ func getChunksIteratorFunction(cfg Config) chunkIteratorFunc { func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, v1.QueryEngine) { iteratorFunc := getChunksIteratorFunction(cfg) - distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, cfg.IngesterMetadataStreaming, iteratorFunc, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels) + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, iteratorFunc, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels) ns := make([]QueryableWithFilter, len(stores)) for ix, s := range stores { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 4e54f4d176..fbb868d0e8 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -181,11 +181,6 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { } db, samples := mockTSDB(t, labelsSets, model.Time(start.Unix()*1000), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk)) - samplePairs := []model.SamplePair{} - - for _, s := range samples { - samplePairs = append(samplePairs, model.SamplePair{Timestamp: model.Time(s.TimestampMs), Value: model.SampleValue(s.Value)}) - } distributor := &MockDistributor{} @@ -208,21 +203,8 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { }, } - unorderedResponseMatrix := model.Matrix{ - { - Metric: util.LabelsToMetric(cortexpb.FromLabelAdaptersToLabels(unorderedResponse.Timeseries[0].Labels)), - Values: samplePairs, - }, - { - Metric: util.LabelsToMetric(cortexpb.FromLabelAdaptersToLabels(unorderedResponse.Timeseries[1].Labels)), - Values: samplePairs, - }, - } - distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&unorderedResponse, nil) - distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(unorderedResponseMatrix, nil) - distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels) - distributorQueryable := newDistributorQueryable(distributor, false, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels) + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels) tCases := []struct { name string @@ -232,19 +214,19 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { }{ { name: "should sort if querying 2 queryables", - distributorQueryable: distributorQueryableStreaming, + distributorQueryable: distributorQueryable, storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(db)}, sorted: true, }, { name: "should not sort if querying only ingesters", - distributorQueryable: distributorQueryableStreaming, + distributorQueryable: distributorQueryable, storeQueriables: []QueryableWithFilter{UseBeforeTimestampQueryable(db, start.Add(-1*time.Hour))}, sorted: false, }, { name: "should not sort if querying only stores", - distributorQueryable: UseBeforeTimestampQueryable(distributorQueryableStreaming, start.Add(-1*time.Hour)), + distributorQueryable: UseBeforeTimestampQueryable(distributorQueryable, start.Add(-1*time.Hour)), storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(db)}, sorted: false, }, @@ -254,24 +236,12 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(db)}, sorted: true, }, - { - name: "should not sort if querying only ingesters with streaming off", - distributorQueryable: distributorQueryable, - storeQueriables: []QueryableWithFilter{UseBeforeTimestampQueryable(db, start.Add(-1*time.Hour))}, - sorted: false, - }, - { - name: "should not sort if querying only stores with streaming off", - distributorQueryable: UseBeforeTimestampQueryable(distributorQueryable, start.Add(-1*time.Hour)), - storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(db)}, - sorted: false, - }, } for _, tc := range tCases { for _, thanosEngine := range []bool{false, true} { thanosEngine := thanosEngine - t.Run(tc.name+fmt.Sprintf(", thanos engine: %s", strconv.FormatBool(thanosEngine)), func(t *testing.T) { + t.Run(tc.name+fmt.Sprintf("thanos engine: %s", strconv.FormatBool(thanosEngine)), func(t *testing.T) { wDistributorQueriable := &wrappedSampleAndChunkQueryable{QueryableWithFilter: tc.distributorQueryable} var wQueriables []QueryableWithFilter for _, queryable := range tc.storeQueriables { @@ -407,7 +377,7 @@ func TestLimits(t *testing.T) { response: &streamResponse, } - distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels) + distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels) tCases := []struct { name string @@ -526,35 +496,32 @@ func TestQuerier(t *testing.T) { for _, thanosEngine := range []bool{false, true} { for _, query := range queries { for _, encoding := range encodings { - for _, streaming := range []bool{false, true} { - for _, iterators := range []bool{false, true} { - iterators := iterators - t.Run(fmt.Sprintf("%s/%s/streaming=%t/iterators=%t", query.query, encoding.name, streaming, iterators), func(t *testing.T) { - var queryEngine v1.QueryEngine - if thanosEngine { - queryEngine = engine.New(engine.Opts{ - EngineOpts: opts, - LogicalOptimizers: logicalplan.AllOptimizers, - }) - } else { - queryEngine = promql.NewEngine(opts) - } - cfg.IngesterStreaming = streaming - cfg.Iterators = iterators - // Disable active query tracker to avoid mmap error. - cfg.ActiveQueryTrackerDir = "" - - chunkStore, through := makeMockChunkStore(t, chunks, encoding.e) - distributor := mockDistibutorFor(t, chunkStore, through) - - overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) - require.NoError(t, err) - - queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore)), UseAlwaysQueryable(db)} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) - testRangeQuery(t, queryable, queryEngine, through, query) - }) - } + for _, iterators := range []bool{false, true} { + iterators := iterators + t.Run(fmt.Sprintf("%s/%s/iterators=%t", query.query, encoding.name, iterators), func(t *testing.T) { + var queryEngine v1.QueryEngine + if thanosEngine { + queryEngine = engine.New(engine.Opts{ + EngineOpts: opts, + LogicalOptimizers: logicalplan.AllOptimizers, + }) + } else { + queryEngine = promql.NewEngine(opts) + } + cfg.Iterators = iterators + // Disable active query tracker to avoid mmap error. + cfg.ActiveQueryTrackerDir = "" + + chunkStore, through := makeMockChunkStore(t, chunks, encoding.e) + distributor := mockDistibutorFor(t, chunkStore, through) + + overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) + require.NoError(t, err) + + queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore)), UseAlwaysQueryable(db)} + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) + testRangeQuery(t, queryable, queryEngine, through, query) + }) } } } @@ -677,51 +644,48 @@ func TestNoHistoricalQueryToIngester(t *testing.T) { cfg := Config{} // Disable active query tracker to avoid mmap error. cfg.ActiveQueryTrackerDir = "" - for _, ingesterStreaming := range []bool{true, false} { - for _, thanosEngine := range []bool{true, false} { - cfg.IngesterStreaming = ingesterStreaming - for _, c := range testCases { - cfg.QueryIngestersWithin = c.queryIngestersWithin - t.Run(fmt.Sprintf("IngesterStreaming=%t,thanosEngine=%t,queryIngestersWithin=%v, test=%s", cfg.IngesterStreaming, thanosEngine, c.queryIngestersWithin, c.name), func(t *testing.T) { - var queryEngine v1.QueryEngine - if thanosEngine { - queryEngine = engine.New(engine.Opts{ - EngineOpts: opts, - LogicalOptimizers: logicalplan.AllOptimizers, - }) - } else { - queryEngine = promql.NewEngine(opts) - } + for _, thanosEngine := range []bool{true, false} { + for _, c := range testCases { + cfg.QueryIngestersWithin = c.queryIngestersWithin + t.Run(fmt.Sprintf("thanosEngine=%t,queryIngestersWithin=%v, test=%s", thanosEngine, c.queryIngestersWithin, c.name), func(t *testing.T) { + var queryEngine v1.QueryEngine + if thanosEngine { + queryEngine = engine.New(engine.Opts{ + EngineOpts: opts, + LogicalOptimizers: logicalplan.AllOptimizers, + }) + } else { + queryEngine = promql.NewEngine(opts) + } - chunkStore, _ := makeMockChunkStore(t, 24, encodings[0].e) - distributor := &errDistributor{} + chunkStore, _ := makeMockChunkStore(t, 24, encodings[0].e) + distributor := &errDistributor{} - overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) - require.NoError(t, err) + overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) + require.NoError(t, err) - ctx := user.InjectOrgID(context.Background(), "0") - queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))}, nil, log.NewNopLogger()) - query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) - require.NoError(t, err) + ctx := user.InjectOrgID(context.Background(), "0") + queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))}, nil, log.NewNopLogger()) + query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) + require.NoError(t, err) - r := query.Exec(ctx) - _, err = r.Matrix() + r := query.Exec(ctx) + _, err = r.Matrix() - if c.hitIngester { - // If the ingester was hit, the distributor always returns errDistributorError. Prometheus - // wrap any Select() error into "expanding series", so we do wrap it as well to have a match. - require.Error(t, err) - if !thanosEngine { - require.Equal(t, errors.Wrap(errDistributorError, "expanding series").Error(), err.Error()) - } else { - require.Equal(t, errDistributorError.Error(), err.Error()) - } + if c.hitIngester { + // If the ingester was hit, the distributor always returns errDistributorError. Prometheus + // wrap any Select() error into "expanding series", so we do wrap it as well to have a match. + require.Error(t, err) + if !thanosEngine { + require.Equal(t, errors.Wrap(errDistributorError, "expanding series").Error(), err.Error()) } else { - // If the ingester was hit, there would have been an error from errDistributor. - require.NoError(t, err) + require.Equal(t, errDistributorError.Error(), err.Error()) } - }) - } + } else { + // If the ingester was hit, there would have been an error from errDistributor. + require.NoError(t, err) + } + }) } } } @@ -781,46 +745,42 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) { // Disable active query tracker to avoid mmap error. cfg.ActiveQueryTrackerDir = "" - for _, ingesterStreaming := range []bool{true, false} { - cfg.IngesterStreaming = ingesterStreaming - for name, c := range tests { - cfg.MaxQueryIntoFuture = c.maxQueryIntoFuture - t.Run(fmt.Sprintf("%s (ingester streaming enabled = %t)", name, cfg.IngesterStreaming), func(t *testing.T) { - queryEngine := promql.NewEngine(opts) + for name, c := range tests { + cfg.MaxQueryIntoFuture = c.maxQueryIntoFuture + t.Run(name, func(t *testing.T) { + queryEngine := promql.NewEngine(opts) - // We don't need to query any data for this test, so an empty store is fine. - chunkStore := &emptyChunkStore{} - distributor := &MockDistributor{} - distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) - distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) + // We don't need to query any data for this test, so an empty store is fine. + chunkStore := &emptyChunkStore{} + distributor := &MockDistributor{} + distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) - overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) - require.NoError(t, err) + overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) + require.NoError(t, err) - ctx := user.InjectOrgID(context.Background(), "0") - queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) - query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.queryStartTime, c.queryEndTime, time.Minute) - require.NoError(t, err) + ctx := user.InjectOrgID(context.Background(), "0") + queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))} + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) + query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.queryStartTime, c.queryEndTime, time.Minute) + require.NoError(t, err) - r := query.Exec(ctx) - require.Nil(t, r.Err) + r := query.Exec(ctx) + require.Nil(t, r.Err) - _, err = r.Matrix() - require.Nil(t, err) - - if !c.expectedSkipped { - // Assert on the time range of the actual executed query (5s delta). - delta := float64(5000) - require.Len(t, distributor.Calls, 1) - assert.InDelta(t, util.TimeToMillis(c.expectedStartTime), int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), delta) - assert.InDelta(t, util.TimeToMillis(c.expectedEndTime), int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), delta) - } else { - // Ensure no query has been executed (because skipped). - assert.Len(t, distributor.Calls, 0) - } - }) - } + _, err = r.Matrix() + require.Nil(t, err) + + if !c.expectedSkipped { + // Assert on the time range of the actual executed query (5s delta). + delta := float64(5000) + require.Len(t, distributor.Calls, 1) + assert.InDelta(t, util.TimeToMillis(c.expectedStartTime), int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), delta) + assert.InDelta(t, util.TimeToMillis(c.expectedEndTime), int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), delta) + } else { + // Ensure no query has been executed (because skipped). + assert.Len(t, distributor.Calls, 0) + } + }) } } @@ -1019,7 +979,6 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { var cfg Config flagext.DefaultValues(&cfg) - cfg.IngesterStreaming = ingesterStreaming cfg.IngesterMetadataStreaming = ingesterStreaming // Disable active query tracker to avoid mmap error. cfg.ActiveQueryTrackerDir = "" @@ -1038,7 +997,6 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { return } distributor := &MockDistributor{} - distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) @@ -1259,11 +1217,8 @@ func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *Moc Labels: []cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "foo"}}, Chunks: chunks, } - matrix, err := chunk.ChunksToMatrix(context.Background(), cs.chunks, 0, through) - require.NoError(t, err) result := &MockDistributor{} - result.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(matrix, nil) result.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{Chunkseries: []client.TimeSeriesChunk{tsc}}, nil) return result } @@ -1296,9 +1251,6 @@ type errDistributor struct{} var errDistributorError = fmt.Errorf("errDistributorError") -func (m *errDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { - return nil, errDistributorError -} func (m *errDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) { return nil, errDistributorError } @@ -1348,10 +1300,6 @@ func (c *emptyChunkStore) IsCalled() bool { type emptyDistributor struct{} -func (d *emptyDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { - return nil, nil -} - func (d *emptyDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) { return &client.QueryStreamResponse{}, nil } @@ -1504,42 +1452,39 @@ func TestShortTermQueryToLTS(t *testing.T) { // Disable active query tracker to avoid mmap error. cfg.ActiveQueryTrackerDir = "" - for _, ingesterStreaming := range []bool{true, false} { - cfg.IngesterStreaming = ingesterStreaming - for _, c := range testCases { - cfg.QueryIngestersWithin = c.queryIngestersWithin - cfg.QueryStoreAfter = c.queryStoreAfter - t.Run(fmt.Sprintf("IngesterStreaming=%t,test=%s", cfg.IngesterStreaming, c.name), func(t *testing.T) { - //parallel testing causes data race - chunkStore := &emptyChunkStore{} - distributor := &errDistributor{} + for _, c := range testCases { + cfg.QueryIngestersWithin = c.queryIngestersWithin + cfg.QueryStoreAfter = c.queryStoreAfter + t.Run(c.name, func(t *testing.T) { + //parallel testing causes data race + chunkStore := &emptyChunkStore{} + distributor := &errDistributor{} - overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) - require.NoError(t, err) + overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) + require.NoError(t, err) - queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))}, nil, log.NewNopLogger()) - ctx := user.InjectOrgID(context.Background(), "0") - query, err := engine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) - require.NoError(t, err) + queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))}, nil, log.NewNopLogger()) + ctx := user.InjectOrgID(context.Background(), "0") + query, err := engine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) + require.NoError(t, err) - r := query.Exec(ctx) - _, err = r.Matrix() + r := query.Exec(ctx) + _, err = r.Matrix() - if c.hitIngester { - // If the ingester was hit, the distributor always returns errDistributorError. Prometheus - // wrap any Select() error into "expanding series", so we do wrap it as well to have a match. - require.Error(t, err) - require.Equal(t, errors.Wrap(errDistributorError, "expanding series").Error(), err.Error()) - } else { - // If the ingester was hit, there would have been an error from errDistributor. - require.NoError(t, err) - } + if c.hitIngester { + // If the ingester was hit, the distributor always returns errDistributorError. Prometheus + // wrap any Select() error into "expanding series", so we do wrap it as well to have a match. + require.Error(t, err) + require.Equal(t, errors.Wrap(errDistributorError, "expanding series").Error(), err.Error()) + } else { + // If the ingester was hit, there would have been an error from errDistributor. + require.NoError(t, err) + } - // Verify if the test did/did not hit the LTS - time.Sleep(30 * time.Millisecond) // NOTE: Since this is a lazy querier there is a race condition between the response and chunk store being called - require.Equal(t, c.hitLTS, chunkStore.IsCalled()) - }) - } + // Verify if the test did/did not hit the LTS + time.Sleep(30 * time.Millisecond) // NOTE: Since this is a lazy querier there is a race condition between the response and chunk store being called + require.Equal(t, c.hitLTS, chunkStore.IsCalled()) + }) } } diff --git a/pkg/querier/testutils.go b/pkg/querier/testutils.go index a60e6761f5..6158769b62 100644 --- a/pkg/querier/testutils.go +++ b/pkg/querier/testutils.go @@ -20,10 +20,6 @@ type MockDistributor struct { mock.Mock } -func (m *MockDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { - args := m.Called(ctx, from, to, matchers) - return args.Get(0).(model.Matrix), args.Error(1) -} func (m *MockDistributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*client.ExemplarQueryResponse, error) { args := m.Called(ctx, from, to, matchers) return args.Get(0).(*client.ExemplarQueryResponse), args.Error(1) diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 91863a22d3..f7187df1ec 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -17,14 +17,11 @@ import ( "time" "unsafe" - "github.com/thanos-io/objstore" - "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/notifier" @@ -35,12 +32,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" "github.com/weaveworks/common/user" "go.uber.org/atomic" "google.golang.org/grpc" "gopkg.in/yaml.v3" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" @@ -1548,21 +1547,21 @@ func TestRecoverAlertsPostOutage(t *testing.T) { downAtActiveAtTime := currentTime.Add(time.Minute * -25) downAtActiveSec := downAtActiveAtTime.Unix() d := &querier.MockDistributor{} - d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( - model.Matrix{ - &model.SampleStream{ - Metric: model.Metric{ - labels.MetricName: "ALERTS_FOR_STATE", - // user1's only alert rule - labels.AlertName: model.LabelValue(mockRules["user1"][0].GetRules()[0].Alert), + + d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + &client.QueryStreamResponse{ + Timeseries: []cortexpb.TimeSeries{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: labels.MetricName, Value: "ALERTS_FOR_STATE"}, + {Name: labels.AlertName, Value: mockRules["user1"][0].GetRules()[0].Alert}, + }, + Samples: []cortexpb.Sample{{TimestampMs: downAtTimeMs, Value: float64(downAtActiveSec)}}, }, - Values: []model.SamplePair{{Timestamp: model.Time(downAtTimeMs), Value: model.SampleValue(downAtActiveSec)}}, }, - }, - nil) + }, nil) d.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Panic("This should not be called for the ruler use-cases.") querierConfig := querier.DefaultQuerierConfig() - querierConfig.IngesterStreaming = false // set up an empty store queryables := []querier.QueryableWithFilter{ @@ -1657,14 +1656,14 @@ func TestRulerDisablesRuleGroups(t *testing.T) { user3Group1Token := tokenForGroup(user3Group1) d := &querier.MockDistributor{} - d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( - model.Matrix{ - &model.SampleStream{ - Values: []model.SamplePair{}, + d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + &client.QueryStreamResponse{ + Timeseries: []cortexpb.TimeSeries{ + { + Samples: []cortexpb.Sample{}, + }, }, }, nil) - querierConfig := querier.DefaultQuerierConfig() - querierConfig.IngesterStreaming = false ruleGroupDesc := func(user, name, namespace string) *rulespb.RuleGroupDesc { return &rulespb.RuleGroupDesc{