diff --git a/CHANGELOG.md b/CHANGELOG.md index 41dfac4a91c..9c692798ac7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased +* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527 * [CHANGE] Enable Compactor and Alertmanager in target all. #6204 * [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151 * [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 41d0f433bf2..7fdfaf89a4d 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -126,6 +126,11 @@ querier: # CLI flag: -querier.per-step-stats-enabled [per_step_stats_enabled: | default = false] + # Use compression for metrics query API or instant and range query APIs. + # Supports 'gzip' and '' (disable compression) + # CLI flag: -querier.response-compression + [response_compression: | default = "gzip"] + # The time after which a metric should be queried from storage and not just # ingesters. 0 means all queries are sent to store. When running the blocks # storage, if this option is enabled, the time range of the query sent to the diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a8ff9ef9280..58c3fdec637 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -95,6 +95,11 @@ api: # CLI flag: -api.build-info-enabled [build_info_enabled: | default = false] + # Choose default codec for querier response serialization. Supports 'json' and + # 'protobuf'. + # CLI flag: -api.querier-default-codec + [querier_default_codec: | default = "json"] + # The server_config configures the HTTP and gRPC server of the launched # service(s). [server: ] @@ -3718,6 +3723,11 @@ The `querier_config` configures the Cortex querier. # CLI flag: -querier.per-step-stats-enabled [per_step_stats_enabled: | default = false] +# Use compression for metrics query API or instant and range query APIs. +# Supports 'gzip' and '' (disable compression) +# CLI flag: -querier.response-compression +[response_compression: | default = "gzip"] + # The time after which a metric should be queried from storage and not just # ingesters. 0 means all queries are sent to store. When running the blocks # storage, if this option is enabled, the time range of the query sent to the diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index aba6f3695d5..c1627547806 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -115,3 +115,4 @@ Currently experimental features are: - String interning for metrics labels - Enable string interning for metrics labels by setting `-ingester.labels-string-interning-enabled` on Ingester. - Query-frontend: query rejection (`-frontend.query-rejection.enabled`) +- Querier: protobuf codec (`-api.querier-default-codec`) diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index a2c81ed95ff..c5a11b612ae 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -203,6 +203,26 @@ func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) { }) } +func TestQueryFrontendProtobufCodec(t *testing.T) { + runQueryFrontendTest(t, queryFrontendTestConfig{ + testMissingMetricName: false, + querySchedulerEnabled: true, + queryStatsEnabled: true, + setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) + + minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + flags = mergeFlags(e2e.EmptyFlags(), map[string]string{ + "-api.querier-default-codec": "protobuf", + "-querier.response-compression": "gzip", + }) + return cortexConfigFile, flags + }, + }) +} + func TestQueryFrontendRemoteRead(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ remoteReadEnabled: true, diff --git a/pkg/api/api.go b/pkg/api/api.go index 5eb13cbb243..92b6e1f3687 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/regexp" "github.com/klauspost/compress/gzhttp" + "github.com/pkg/errors" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/httputil" @@ -73,13 +74,20 @@ type Config struct { corsRegexString string `yaml:"cors_origin"` buildInfoEnabled bool `yaml:"build_info_enabled"` + + QuerierDefaultCodec string `yaml:"querier_default_codec"` } +var ( + errUnsupportedDefaultCodec = errors.New("unsupported default codec type. Supported types are 'json' and 'protobuf'") +) + // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ResponseCompression, "api.response-compression-enabled", false, "Use GZIP compression for API responses. Some endpoints serve large YAML or JSON blobs which can benefit from compression.") f.Var(&cfg.HTTPRequestHeadersToLog, "api.http-request-headers-to-log", "Which HTTP Request headers to add to logs") f.BoolVar(&cfg.buildInfoEnabled, "api.build-info-enabled", false, "If enabled, build Info API will be served by query frontend or querier.") + f.StringVar(&cfg.QuerierDefaultCodec, "api.querier-default-codec", "json", "Choose default codec for querier response serialization. Supports 'json' and 'protobuf'.") cfg.RegisterFlagsWithPrefix("", f) } @@ -90,6 +98,14 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.corsRegexString, prefix+"server.cors-origin", ".*", `Regex for CORS origin. It is fully anchored. Example: 'https?://(domain1|domain2)\.com'`) } +// validate config +func (cfg *Config) Validate() error { + if cfg.QuerierDefaultCodec != "json" && cfg.QuerierDefaultCodec != "protobuf" { + return errUnsupportedDefaultCodec + } + return nil +} + // Push either wraps the distributor push function as configured or returns the distributor push directly. func (cfg *Config) wrapDistributorPush(d *distributor.Distributor) push.Func { if cfg.DistributorPushWrapper != nil { diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 84001dc8ca1..8eff8bd30e0 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -26,6 +26,7 @@ import ( "github.com/weaveworks/common/middleware" "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/querier/codec" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util" ) @@ -231,6 +232,9 @@ func NewQuerierHandler( false, ) + // JSON codec is already installed. Install Protobuf codec to give the option for using either. + api.InstallCodec(codec.ProtobufCodec{}) + router := mux.NewRouter() // Use a separate metric for the querier in order to differentiate requests from the query-frontend when diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 1607bb525cc..03ceb6e3cc3 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -182,6 +182,9 @@ func (c *Config) Validate(log log.Logger) error { return errInvalidHTTPPrefix } + if err := c.API.Validate(); err != nil { + return errors.Wrap(err, "invalid api config") + } if err := c.Storage.Validate(); err != nil { return errors.Wrap(err, "invalid storage config") } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index ed3c61d9488..b8d39daeba0 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -442,9 +442,9 @@ func (t *Cortex) initFlusher() (serv services.Service, err error) { func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) { queryAnalyzer := querysharding.NewQueryAnalyzer() // PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses. - prometheusCodec := queryrange.NewPrometheusCodec(false) + prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) // ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats) - shardedPrometheusCodec := queryrange.NewPrometheusCodec(true) + shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, @@ -472,7 +472,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro queryRangeMiddlewares, instantQueryMiddlewares, prometheusCodec, - instantquery.InstantQueryCodec, + instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec), t.Overrides, queryAnalyzer, t.Cfg.Querier.DefaultEvaluationInterval, diff --git a/pkg/querier/codec/protobuf_codec.go b/pkg/querier/codec/protobuf_codec.go new file mode 100644 index 00000000000..b835d573e8a --- /dev/null +++ b/pkg/querier/codec/protobuf_codec.go @@ -0,0 +1,161 @@ +package codec + +import ( + "github.com/gogo/protobuf/proto" + jsoniter "github.com/json-iterator/go" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/util/stats" + v1 "github.com/prometheus/prometheus/web/api/v1" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querier/tripperware" +) + +type ProtobufCodec struct{} + +func (p ProtobufCodec) ContentType() v1.MIMEType { + return v1.MIMEType{Type: "application", SubType: "x-protobuf"} +} + +func (p ProtobufCodec) CanEncode(resp *v1.Response) bool { + // Errors are parsed by default json codec + if resp.Error != "" || resp.Data == nil { + return false + } + return true +} + +func (p ProtobufCodec) Encode(resp *v1.Response) ([]byte, error) { + prometheusQueryResponse, err := createPrometheusQueryResponse(resp) + if err != nil { + return []byte{}, err + } + b, err := proto.Marshal(prometheusQueryResponse) + return b, err +} + +func createPrometheusQueryResponse(resp *v1.Response) (*tripperware.PrometheusResponse, error) { + var data = resp.Data.(*v1.QueryData) + + var queryResult tripperware.PrometheusQueryResult + switch string(data.ResultType) { + case model.ValMatrix.String(): + queryResult.Result = &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: *getMatrixSampleStreams(data), + }, + } + case model.ValVector.String(): + queryResult.Result = &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: *getVectorSamples(data), + }, + } + default: + json := jsoniter.ConfigCompatibleWithStandardLibrary + rawBytes, err := json.Marshal(data) + if err != nil { + return nil, err + } + queryResult.Result = &tripperware.PrometheusQueryResult_RawBytes{RawBytes: rawBytes} + } + + var stats *tripperware.PrometheusResponseStats + if data.Stats != nil { + builtin := data.Stats.Builtin() + stats = &tripperware.PrometheusResponseStats{Samples: getStats(&builtin)} + } + + return &tripperware.PrometheusResponse{ + Status: string(resp.Status), + Data: tripperware.PrometheusData{ + ResultType: string(data.ResultType), + Result: queryResult, + Stats: stats, + }, + ErrorType: string(resp.ErrorType), + Error: resp.Error, + Warnings: resp.Warnings, + }, nil +} + +func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream { + sampleStreamsLen := len(data.Result.(promql.Matrix)) + sampleStreams := make([]tripperware.SampleStream, sampleStreamsLen) + + for i := 0; i < sampleStreamsLen; i++ { + labelsLen := len(data.Result.(promql.Matrix)[i].Metric) + var labels []cortexpb.LabelAdapter + if labelsLen > 0 { + labels = make([]cortexpb.LabelAdapter, labelsLen) + for j := 0; j < labelsLen; j++ { + labels[j] = cortexpb.LabelAdapter{ + Name: data.Result.(promql.Matrix)[i].Metric[j].Name, + Value: data.Result.(promql.Matrix)[i].Metric[j].Value, + } + } + } + + samplesLen := len(data.Result.(promql.Matrix)[i].Floats) + var samples []cortexpb.Sample + if samplesLen > 0 { + samples = make([]cortexpb.Sample, samplesLen) + for j := 0; j < samplesLen; j++ { + samples[j] = cortexpb.Sample{ + Value: data.Result.(promql.Matrix)[i].Floats[j].F, + TimestampMs: data.Result.(promql.Matrix)[i].Floats[j].T, + } + } + } + sampleStreams[i] = tripperware.SampleStream{Labels: labels, Samples: samples} + } + return &sampleStreams +} + +func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample { + vectorSamplesLen := len(data.Result.(promql.Vector)) + vectorSamples := make([]tripperware.Sample, vectorSamplesLen) + + for i := 0; i < vectorSamplesLen; i++ { + labelsLen := len(data.Result.(promql.Vector)[i].Metric) + var labels []cortexpb.LabelAdapter + if labelsLen > 0 { + labels = make([]cortexpb.LabelAdapter, labelsLen) + for j := 0; j < labelsLen; j++ { + labels[j] = cortexpb.LabelAdapter{ + Name: data.Result.(promql.Vector)[i].Metric[j].Name, + Value: data.Result.(promql.Vector)[i].Metric[j].Value, + } + } + } + + vectorSamples[i] = tripperware.Sample{ + Labels: labels, + Sample: &cortexpb.Sample{ + TimestampMs: data.Result.(promql.Vector)[i].T, + Value: data.Result.(promql.Vector)[i].F, + }, + } + } + return &vectorSamples +} + +func getStats(builtin *stats.BuiltinStats) *tripperware.PrometheusResponseSamplesStats { + queryableSamplesStatsPerStepLen := len(builtin.Samples.TotalQueryableSamplesPerStep) + queryableSamplesStatsPerStep := make([]*tripperware.PrometheusResponseQueryableSamplesStatsPerStep, queryableSamplesStatsPerStepLen) + for i := 0; i < queryableSamplesStatsPerStepLen; i++ { + queryableSamplesStatsPerStep[i] = &tripperware.PrometheusResponseQueryableSamplesStatsPerStep{ + Value: builtin.Samples.TotalQueryableSamplesPerStep[i].V, + TimestampMs: builtin.Samples.TotalQueryableSamplesPerStep[i].T, + } + } + + statSamples := tripperware.PrometheusResponseSamplesStats{ + TotalQueryableSamples: builtin.Samples.TotalQueryableSamples, + TotalQueryableSamplesPerStep: queryableSamplesStatsPerStep, + PeakSamples: int64(builtin.Samples.PeakSamples), + } + + return &statSamples +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index cdf24ee3db4..4c481cbee4d 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -50,6 +50,9 @@ type Config struct { QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"` EnablePerStepStats bool `yaml:"per_step_stats_enabled"` + // Use compression for metrics query API or instant and range query APIs. + ResponseCompression string `yaml:"response_compression"` + // QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters. QueryStoreAfter time.Duration `yaml:"query_store_after"` MaxQueryIntoFuture time.Duration `yaml:"max_query_into_future"` @@ -90,6 +93,7 @@ var ( errBadLookbackConfigs = errors.New("bad settings, query_store_after >= query_ingesters_within which can result in queries not being sent") errShuffleShardingLookbackLessThanQueryStoreAfter = errors.New("the shuffle-sharding lookback period should be greater or equal than the configured 'query store after'") errEmptyTimeRange = errors.New("empty time range") + errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip' and '' (disable compression)") ) // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -111,6 +115,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.") f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") f.BoolVar(&cfg.EnablePerStepStats, "querier.per-step-stats-enabled", false, "Enable returning samples stats per steps in query response.") + f.StringVar(&cfg.ResponseCompression, "querier.response-compression", "gzip", "Use compression for metrics query API or instant and range query APIs. Supports 'gzip' and '' (disable compression)") f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.") f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.") f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.") @@ -133,6 +138,10 @@ func (cfg *Config) Validate() error { } } + if cfg.ResponseCompression != "" && cfg.ResponseCompression != "gzip" { + return errUnsupportedResponseCompression + } + if cfg.ShuffleShardingIngestersLookbackPeriod > 0 { if cfg.ShuffleShardingIngestersLookbackPeriod < cfg.QueryStoreAfter { return errShuffleShardingLookbackLessThanQueryStoreAfter diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index afed40adc40..d83ae20ae91 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -13,6 +13,7 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" + "github.com/prometheus/common/model" "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc/status" @@ -22,7 +23,7 @@ import ( ) var ( - InstantQueryCodec tripperware.Codec = newInstantQueryCodec() + InstantQueryCodec tripperware.Codec = NewInstantQueryCodec("", "protobuf") json = jsoniter.Config{ EscapeHTML: false, // No HTML in our responses. @@ -33,11 +34,27 @@ var ( type instantQueryCodec struct { tripperware.Codec - now func() time.Time + compression tripperware.Compression + defaultCodecType tripperware.CodecType + now func() time.Time } -func newInstantQueryCodec() instantQueryCodec { - return instantQueryCodec{now: time.Now} +func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) instantQueryCodec { + compression := tripperware.NonCompression // default + if compressionStr == string(tripperware.GzipCompression) { + compression = tripperware.GzipCompression + } + + defaultCodecType := tripperware.JsonCodecType // default + if defaultCodecTypeStr == string(tripperware.ProtobufCodecType) { + defaultCodecType = tripperware.ProtobufCodecType + } + + return instantQueryCodec{ + compression: compression, + defaultCodecType: defaultCodecType, + now: time.Now, + } } func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) { @@ -83,17 +100,36 @@ func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ } var resp tripperware.PrometheusResponse - if err := json.Unmarshal(buf, &resp); err != nil { + err = tripperware.UnmarshalResponse(r, buf, &resp) + + if err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) } + // protobuf serialization treats empty slices as nil + switch resp.Data.ResultType { + case model.ValMatrix.String(): + if resp.Data.Result.GetMatrix().SampleStreams == nil { + resp.Data.Result.GetMatrix().SampleStreams = []tripperware.SampleStream{} + } + case model.ValVector.String(): + if resp.Data.Result.GetVector().Samples == nil { + resp.Data.Result.GetVector().Samples = []tripperware.Sample{} + } + } + + if resp.Headers == nil { + resp.Headers = []*tripperware.PrometheusResponseHeader{} + } + for h, hv := range r.Header { resp.Headers = append(resp.Headers, &tripperware.PrometheusResponseHeader{Name: h, Values: hv}) } + return &resp, nil } -func (instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Request) (*http.Request, error) { +func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Request) (*http.Request, error) { promReq, ok := r.(*tripperware.PrometheusRequest) if !ok { return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format") @@ -120,8 +156,7 @@ func (instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Reques } } - // Always ask gzip to the querier - h.Set("Accept-Encoding", "gzip") + tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression) req := &http.Request{ Method: "GET", @@ -152,7 +187,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res resp := http.Response{ Header: http.Header{ - "Content-Type": []string{"application/json"}, + "Content-Type": []string{tripperware.ApplicationJson}, }, Body: io.NopCloser(bytes.NewBuffer(b)), StatusCode: http.StatusOK, diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index 6e2acbfc3c3..0f562909bd6 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -12,10 +12,12 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" + "github.com/weaveworks/common/httpgrpc" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/cortexpb" @@ -105,102 +107,265 @@ func TestRequest(t *testing.T) { } } -func TestGzippedResponse(t *testing.T) { +func TestCompressedResponse(t *testing.T) { t.Parallel() - for _, tc := range []struct { - body string - status int - err error + for i, tc := range []struct { + compression string + jsonBody string + promBody *tripperware.PrometheusResponse + status int + err error }{ { - body: `{"status":"success","data":{"resultType":"string","result":[1,"foo"]}}`, + compression: "gzip", + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValString.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"string","result":[1,"foo"]}`), + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{}, + }, status: 200, }, { - body: `error generic 400`, - status: 400, - err: httpgrpc.Errorf(400, "error generic 400"), + compression: `gzip`, + jsonBody: `error generic 400`, + status: 400, + err: httpgrpc.Errorf(400, `error generic 400`), }, { - status: 400, - err: httpgrpc.Errorf(400, ""), + compression: `gzip`, + status: 400, + err: httpgrpc.Errorf(400, ""), }, } { - for _, c := range []bool{true, false} { - c := c - t.Run(fmt.Sprintf("compressed %t [%s]", c, tc.body), func(t *testing.T) { - t.Parallel() - - h := http.Header{ - "Content-Type": []string{"application/json"}, - } - - responseBody := bytes.NewBuffer([]byte(tc.body)) - if c { - h.Set("Content-Encoding", "gzip") - var buf bytes.Buffer - w := gzip.NewWriter(&buf) - _, err := w.Write([]byte(tc.body)) - require.NoError(t, err) - w.Close() - responseBody = &buf - } + t.Run(strconv.Itoa(i), func(t *testing.T) { + t.Parallel() + h := http.Header{} + var b []byte + if tc.promBody != nil { + protobuf, err := proto.Marshal(tc.promBody) + b = protobuf + require.NoError(t, err) + h.Set("Content-Type", tripperware.ApplicationProtobuf) + tc.promBody.Headers = append(tc.promBody.Headers, &tripperware.PrometheusResponseHeader{Name: "Content-Type", Values: []string{tripperware.ApplicationProtobuf}}) + } else { + b = []byte(tc.jsonBody) + h.Set("Content-Type", "application/json") + } - response := &http.Response{ - StatusCode: tc.status, - Header: h, - Body: io.NopCloser(responseBody), - } - r, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil) - require.Equal(t, tc.err, err) + h.Set("Content-Encoding", tc.compression) + if tc.promBody != nil { + tc.promBody.Headers = append(tc.promBody.Headers, &tripperware.PrometheusResponseHeader{Name: "Content-Encoding", Values: []string{"gzip"}}) + } + responseBody := &bytes.Buffer{} + w := gzip.NewWriter(responseBody) + _, err := w.Write(b) + require.NoError(t, err) + w.Close() - if err == nil { - resp, err := json.Marshal(r) - require.NoError(t, err) + response := &http.Response{ + StatusCode: tc.status, + Header: h, + Body: io.NopCloser(responseBody), + } + resp, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil) + require.Equal(t, tc.err, err) - require.Equal(t, tc.body, string(resp)) - } - }) - } + if err == nil { + require.NoError(t, err) + require.Equal(t, tc.promBody, resp) + } + }) } } func TestResponse(t *testing.T) { t.Parallel() for i, tc := range []struct { - body string + jsonBody string + promBody *tripperware.PrometheusResponse }{ { - body: `{"status":"success","data":{"resultType":"string","result":[1,"foo"]}}`, + jsonBody: `{"status":"success","data":{"resultType":"string","result":[1,"foo"]}}`, }, { - body: `{"status":"success","data":{"resultType":"string","result":[1,"foo"],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]],"peakSamples":10}}}}`, + jsonBody: `{"status":"success","data":{"resultType":"string","result":[1,"foo"],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]],"peakSamples":10}}}}`, }, { - body: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1,"137"],[2,"137"]]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]],"peakSamples":10}}}}`, + jsonBody: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1,"137"],[2,"137"]]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]],"peakSamples":10}}}}`, }, { - body: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1,"137"],[2,"137"]]}]}}`, + jsonBody: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1,"137"],[2,"137"]]}]}}`, }, { - body: `{"status":"success","data":{"resultType":"scalar","result":[1,"13"]}}`, + jsonBody: `{"status":"success","data":{"resultType":"scalar","result":[1,"13"]}}`, }, { - body: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"1266464.0146205237"]}]}}`, + jsonBody: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"1266464.0146205237"]}]}}`, }, { - body: testHistogramResponse, + jsonBody: testHistogramResponse, + }, + { + jsonBody: `{"status":"success","data":{"resultType":"string","result":[1,"foo"]}}`, + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValString.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"string","result":[1,"foo"]}`), + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{}, + }, + }, + { + jsonBody: `{"status":"success","data":{"resultType":"string","result":[1,"foo"],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]]}}}}`, + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValString.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"string","result":[1,"foo"],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]]}}}`), + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{}, + }, + }, + { + jsonBody: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1,"137"],[2,"137"]]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]],"peakSamples":10}}}}`, + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 137, TimestampMs: 1000}, + {Value: 137, TimestampMs: 2000}, + }, + }, + }, + }, + }, + }, + Stats: &tripperware.PrometheusResponseStats{ + Samples: &tripperware.PrometheusResponseSamplesStats{ + TotalQueryableSamplesPerStep: []*tripperware.PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 5, TimestampMs: 1536673680000}, + {Value: 5, TimestampMs: 1536673780000}, + }, + TotalQueryableSamples: 10, + PeakSamples: 10, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{}, + }, + }, + { + jsonBody: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1,"137"],[2,"137"]]}]}}`, + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 137, TimestampMs: 1000}, + {Value: 137, TimestampMs: 2000}, + }, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{}, + }, + }, + { + jsonBody: `{"status":"success","data":{"resultType":"scalar","result":[1,"13"]}}`, + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValString.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"scalar","result":[1,"13"]}`), + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{}, + }, + }, + { + jsonBody: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"1266464.0146205237"]}]}}`, + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{}, + Sample: &cortexpb.Sample{Value: 1266464.0146205237, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{}, + }, }, } { tc := tc t.Run(strconv.Itoa(i), func(t *testing.T) { t.Parallel() - response := &http.Response{ - StatusCode: 200, - Header: http.Header{"Content-Type": []string{"application/json"}}, - Body: io.NopCloser(bytes.NewBuffer([]byte(tc.body))), + var response *http.Response + if tc.promBody != nil { + protobuf, err := proto.Marshal(tc.promBody) + require.NoError(t, err) + response = &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{tripperware.ApplicationProtobuf}}, + Body: io.NopCloser(bytes.NewBuffer(protobuf)), + } + tc.promBody.Headers = append(tc.promBody.Headers, &tripperware.PrometheusResponseHeader{Name: "Content-Type", Values: []string{tripperware.ApplicationProtobuf}}) + } else { + response = &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), + } } + resp, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil) require.NoError(t, err) @@ -208,8 +373,8 @@ func TestResponse(t *testing.T) { response = &http.Response{ StatusCode: 200, Header: http.Header{"Content-Type": []string{"application/json"}}, - Body: io.NopCloser(bytes.NewBuffer([]byte(tc.body))), - ContentLength: int64(len(tc.body)), + Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), + ContentLength: int64(len(tc.jsonBody)), } resp2, err := InstantQueryCodec.EncodeResponse(context.Background(), resp) require.NoError(t, err) @@ -498,6 +663,1021 @@ func TestMergeResponse(t *testing.T) { } } +func TestMergeResponseProtobuf(t *testing.T) { + t.Parallel() + defaultReq := &tripperware.PrometheusRequest{ + Query: "sum(up)", + } + for _, tc := range []struct { + name string + req tripperware.Request + resps []*tripperware.PrometheusResponse + expectedResp string + expectedErr error + cancelBeforeDecode bool + expectedDecodeErr error + cancelBeforeMerge bool + }{ + { + name: "empty response", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: make([]tripperware.Sample, 0), + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"vector","result":[]}}`, + }, + { + name: "empty response with stats", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{}, + }, + }, + }, + Stats: &tripperware.PrometheusResponseStats{ + Samples: &tripperware.PrometheusResponseSamplesStats{ + TotalQueryableSamplesPerStep: []*tripperware.PrometheusResponseQueryableSamplesStatsPerStep{}, + TotalQueryableSamples: 0, + PeakSamples: 10, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"vector","result":[],"stats":{"samples":{"totalQueryableSamples":0,"totalQueryableSamplesPerStep":[],"peakSamples":10}}}}`, + }, + { + name: "single response", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}]}}`, + }, + { + name: "single response with stats", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + Stats: &tripperware.PrometheusResponseStats{ + Samples: &tripperware.PrometheusResponseSamplesStats{ + TotalQueryableSamplesPerStep: []*tripperware.PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 10, TimestampMs: 1000}, + }, + TotalQueryableSamples: 10, + PeakSamples: 10, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]],"peakSamples":10}}}}`, + }, + { + name: "duplicated response", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}]}}`, + }, + { + name: "duplicated response with stats", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + Stats: &tripperware.PrometheusResponseStats{ + Samples: &tripperware.PrometheusResponseSamplesStats{ + TotalQueryableSamplesPerStep: []*tripperware.PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 10, TimestampMs: 1000}, + }, + TotalQueryableSamples: 10, + PeakSamples: 10, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + Stats: &tripperware.PrometheusResponseStats{ + Samples: &tripperware.PrometheusResponseSamplesStats{ + TotalQueryableSamplesPerStep: []*tripperware.PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 10, TimestampMs: 1000}, + }, + TotalQueryableSamples: 10, + PeakSamples: 10, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":20,"totalQueryableSamplesPerStep":[[1,20]],"peakSamples":10}}}}`, + }, + { + name: "merge two responses", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "foo"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "bar"}, + }, + Sample: &cortexpb.Sample{Value: 2, TimestampMs: 2000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[2,"2"]},{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`, + }, + { + name: "merge two responses with sort", + req: &tripperware.PrometheusRequest{Query: "sort(sum by (job) (up))"}, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "foo"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "bar"}, + }, + Sample: &cortexpb.Sample{Value: 2, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]},{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`, + }, + { + name: "merge two responses with sort_desc", + req: &tripperware.PrometheusRequest{Query: "sort_desc(sum by (job) (up))"}, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "foo"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "bar"}, + }, + Sample: &cortexpb.Sample{Value: 2, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]},{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`, + }, + { + name: "merge two responses with topk", + req: &tripperware.PrometheusRequest{Query: "topk(10, up) by(job)"}, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "foo"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "bar"}, + }, + Sample: &cortexpb.Sample{Value: 2, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]},{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`, + }, + { + name: "merge with warnings", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "foo"}, + }, + Sample: &cortexpb.Sample{Value: 2, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Warnings: []string{"warning1", "warning2"}, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "bar"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Warnings: []string{"warning1", "warning3"}, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"1"]},{"metric":{"__name__":"up","job":"foo"},"value":[1,"2"]}]},"warnings":["warning1","warning2","warning3"]}`, + }, + { + name: "merge two responses with stats", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "foo"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + Stats: &tripperware.PrometheusResponseStats{ + Samples: &tripperware.PrometheusResponseSamplesStats{ + TotalQueryableSamplesPerStep: []*tripperware.PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 10, TimestampMs: 1000}, + }, + TotalQueryableSamples: 10, + PeakSamples: 10, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "bar"}, + }, + Sample: &cortexpb.Sample{Value: 2, TimestampMs: 2000}, + }, + }, + }, + }, + }, + Stats: &tripperware.PrometheusResponseStats{ + Samples: &tripperware.PrometheusResponseSamplesStats{ + TotalQueryableSamplesPerStep: []*tripperware.PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 10, TimestampMs: 1000}, + }, + TotalQueryableSamples: 10, + PeakSamples: 10, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[2,"2"]},{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":20,"totalQueryableSamplesPerStep":[[1,20]],"peakSamples":10}}}}`, + }, + { + name: "responses don't contain vector, should return an error", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValString.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"string","result":[1662682521.409,"foo"]}`), + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValString.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"string","result":[1662682521.409,"foo"]}`), + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedErr: errors.New("unexpected result type: string"), + }, + { + name: "single matrix response", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + }, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000}, + {Value: 2, TimestampMs: 2000}, + }, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"up"},"values":[[1,"1"],[2,"2"]]}]}}`, + }, + { + name: "multiple matrix responses without duplicated series", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000}, + {Value: 2, TimestampMs: 2000}, + }, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "foo"}, + }, + Samples: []cortexpb.Sample{ + {Value: 3, TimestampMs: 3000}, + {Value: 4, TimestampMs: 4000}, + }, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"]]},{"metric":{"__name__":"foo"},"values":[[3,"3"],[4,"4"]]}]}}`, + }, + { + name: "multiple matrix responses with duplicated series, but not same samples", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000}, + {Value: 2, TimestampMs: 2000}, + }, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 3, TimestampMs: 3000}, + }, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"],[3,"3"]]}]}}`, + }, + { + name: "multiple matrix responses with duplicated series and same samples", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000}, + {Value: 2, TimestampMs: 2000}, + }, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000}, + {Value: 2, TimestampMs: 2000}, + {Value: 3, TimestampMs: 3000}, + }, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedResp: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"],[3,"3"]]}]}}`, + }, + { + name: "context cancelled before decoding response", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "foo"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "bar"}, + }, + Sample: &cortexpb.Sample{Value: 2, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedDecodeErr: context.Canceled, + cancelBeforeDecode: true, + }, + { + name: "context cancelled before merging response", + req: defaultReq, + resps: []*tripperware.PrometheusResponse{ + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "foo"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + { + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "up"}, + {Name: "job", Value: "bar"}, + }, + Sample: &cortexpb.Sample{Value: 2, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + Headers: []*tripperware.PrometheusResponseHeader{ + {Name: "Content-Type", Values: []string{"application/x-protobuf"}}, + }, + }, + }, + expectedErr: context.Canceled, + cancelBeforeMerge: true, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ctx, cancelCtx := context.WithCancel(context.Background()) + + var resps []tripperware.Response + for _, r := range tc.resps { + protobuf, err := proto.Marshal(r) + require.NoError(t, err) + hr := &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{"application/x-protobuf"}}, + Body: io.NopCloser(bytes.NewBuffer(protobuf)), + } + + if tc.cancelBeforeDecode { + cancelCtx() + } + dr, err := InstantQueryCodec.DecodeResponse(ctx, hr, nil) + assert.Equal(t, tc.expectedDecodeErr, err) + if err != nil { + cancelCtx() + return + } + resps = append(resps, dr) + } + + if tc.cancelBeforeMerge { + cancelCtx() + } + resp, err := InstantQueryCodec.MergeResponse(ctx, tc.req, resps...) + assert.Equal(t, tc.expectedErr, err) + if err != nil { + cancelCtx() + return + } + dr, err := InstantQueryCodec.EncodeResponse(ctx, resp) + assert.Equal(t, tc.expectedErr, err) + contents, err := io.ReadAll(dr.Body) + assert.Equal(t, tc.expectedErr, err) + assert.Equal(t, tc.expectedResp, string(contents)) + cancelCtx() + }) + } +} + func Benchmark_Decode(b *testing.B) { maxSamplesCount := 1000000 samples := make([]tripperware.SampleStream, maxSamplesCount) @@ -560,3 +1740,67 @@ func Benchmark_Decode(b *testing.B) { } } + +func Benchmark_Decode_Protobuf(b *testing.B) { + maxSamplesCount := 1000000 + samples := make([]tripperware.SampleStream, maxSamplesCount) + + for i := 0; i < maxSamplesCount; i++ { + samples[i].Labels = append(samples[i].Labels, cortexpb.LabelAdapter{Name: fmt.Sprintf("Sample%v", i), Value: fmt.Sprintf("Value%v", i)}) + samples[i].Labels = append(samples[i].Labels, cortexpb.LabelAdapter{Name: fmt.Sprintf("Sample2%v", i), Value: fmt.Sprintf("Value2%v", i)}) + samples[i].Labels = append(samples[i].Labels, cortexpb.LabelAdapter{Name: fmt.Sprintf("Sample3%v", i), Value: fmt.Sprintf("Value3%v", i)}) + samples[i].Samples = append(samples[i].Samples, cortexpb.Sample{TimestampMs: int64(i), Value: float64(i)}) + } + + for name, tc := range map[string]struct { + sampleStream []tripperware.SampleStream + }{ + "100 samples": { + sampleStream: samples[:100], + }, + "1000 samples": { + sampleStream: samples[:1000], + }, + "10000 samples": { + sampleStream: samples[:10000], + }, + "100000 samples": { + sampleStream: samples[:100000], + }, + "1000000 samples": { + sampleStream: samples[:1000000], + }, + } { + b.Run(name, func(b *testing.B) { + r := tripperware.PrometheusResponse{ + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: tc.sampleStream, + }, + }, + }, + }, + } + + body, err := proto.Marshal(&r) + require.NoError(b, err) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + response := &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{"application/x-protobuf"}}, + Body: io.NopCloser(bytes.NewBuffer(body)), + } + _, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil) + require.NoError(b, err) + } + }) + } + +} diff --git a/pkg/querier/tripperware/instantquery/shard_by_query_test.go b/pkg/querier/tripperware/instantquery/shard_by_query_test.go index 0d4dfc41a95..aac85b2a9a4 100644 --- a/pkg/querier/tripperware/instantquery/shard_by_query_test.go +++ b/pkg/querier/tripperware/instantquery/shard_by_query_test.go @@ -9,5 +9,5 @@ import ( func Test_shardQuery(t *testing.T) { t.Parallel() - tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true)) + tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true, "", "protobuf")) } diff --git a/pkg/querier/tripperware/merge.go b/pkg/querier/tripperware/merge.go index 5eb37387c59..8c45ebd0b6a 100644 --- a/pkg/querier/tripperware/merge.go +++ b/pkg/querier/tripperware/merge.go @@ -135,7 +135,7 @@ func matrixMerge(ctx context.Context, resps []*PrometheusResponse) ([]SampleStre } func vectorMerge(ctx context.Context, req Request, resps []*PrometheusResponse) (*Vector, error) { - output := map[string]*Sample{} + output := map[string]Sample{} metrics := []string{} // Used to preserve the order for topk and bottomk. sortPlan, err := sortPlanForQuery(req.GetQuery()) if err != nil { @@ -156,9 +156,6 @@ func vectorMerge(ctx context.Context, req Request, resps []*PrometheusResponse) } for _, sample := range resp.Data.Result.GetVector().Samples { s := sample - if s == nil { - continue - } metric := string(cortexpb.FromLabelAdaptersToLabels(sample.Labels).Bytes(buf)) if existingSample, ok := output[metric]; !ok { output[metric] = s @@ -171,7 +168,7 @@ func vectorMerge(ctx context.Context, req Request, resps []*PrometheusResponse) } result := &Vector{ - Samples: make([]*Sample, 0, len(output)), + Samples: make([]Sample, 0, len(output)), } if len(output) == 0 { @@ -272,7 +269,7 @@ const ( type pair struct { metric string - s *Sample + s Sample } // getSortValueFromPair gets the float value used for sorting from samples. diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index 903ea9679e2..28e7c2430b4 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -12,6 +12,8 @@ import ( "time" "unsafe" + "github.com/golang/snappy" + "github.com/go-kit/log" "github.com/gogo/protobuf/proto" jsoniter "github.com/json-iterator/go" @@ -36,6 +38,18 @@ var ( }.Froze() ) +type CodecType string +type Compression string + +const ( + GzipCompression Compression = "gzip" + NonCompression Compression = "" + JsonCodecType CodecType = "json" + ProtobufCodecType CodecType = "protobuf" + ApplicationProtobuf string = "application/x-protobuf" + ApplicationJson string = "application/json" +) + // Codec is used to encode/decode query range requests and responses so they can be passed down to middlewares. type Codec interface { Merger @@ -455,6 +469,9 @@ func BodyBuffer(res *http.Response, logger log.Logger) ([]byte, error) { defer runutil.CloseWithLogOnErr(logger, gReader, "close gzip reader") return io.ReadAll(gReader) + } else if strings.EqualFold(res.Header.Get("Content-Encoding"), "snappy") { + sReader := snappy.NewReader(buf) + return io.ReadAll(sReader) } return buf.Bytes(), nil @@ -494,7 +511,7 @@ func (s *PrometheusData) UnmarshalJSON(data []byte) error { switch s.ResultType { case model.ValVector.String(): var result struct { - Samples []*Sample `json:"result"` + Samples []Sample `json:"result"` } if err := json.Unmarshal(data, &result); err != nil { return err @@ -530,7 +547,7 @@ func (s *PrometheusData) MarshalJSON() ([]byte, error) { case model.ValVector.String(): res := struct { ResultType string `json:"resultType"` - Data []*Sample `json:"result"` + Data []Sample `json:"result"` Stats *PrometheusResponseStats `json:"stats,omitempty"` }{ ResultType: s.ResultType, @@ -721,3 +738,34 @@ func marshalHistogramBucket(b HistogramBucket, stream *jsoniter.Stream) { jsonutil.MarshalFloat(b.Count, stream) stream.WriteArrayEnd() } + +func (s *PrometheusResponseStats) MarshalJSON() ([]byte, error) { + stats := struct { + Samples *PrometheusResponseSamplesStats `json:"samples"` + }{ + Samples: s.Samples, + } + if s.Samples.TotalQueryableSamplesPerStep == nil { + s.Samples.TotalQueryableSamplesPerStep = []*PrometheusResponseQueryableSamplesStatsPerStep{} + } + return json.Marshal(stats) +} + +func SetRequestHeaders(h http.Header, defaultCodecType CodecType, compression Compression) { + if compression == GzipCompression { + h.Set("Accept-Encoding", string(GzipCompression)) + } + if defaultCodecType == ProtobufCodecType { + h.Set("Accept", ApplicationProtobuf+", "+ApplicationJson) + } else { + h.Set("Accept", ApplicationJson) + } +} + +func UnmarshalResponse(r *http.Response, buf []byte, resp *PrometheusResponse) error { + if r.Header != nil && r.Header.Get("Content-Type") == ApplicationProtobuf { + return proto.Unmarshal(buf, resp) + } else { + return json.Unmarshal(buf, resp) + } +} diff --git a/pkg/querier/tripperware/query.pb.go b/pkg/querier/tripperware/query.pb.go index ed3fb150845..2e16fc9c6db 100644 --- a/pkg/querier/tripperware/query.pb.go +++ b/pkg/querier/tripperware/query.pb.go @@ -835,7 +835,7 @@ func (*PrometheusQueryResult) XXX_OneofWrappers() []interface{} { } type Vector struct { - Samples []*Sample `protobuf:"bytes,1,rep,name=samples,proto3" json:"samples,omitempty"` + Samples []Sample `protobuf:"bytes,1,rep,name=samples,proto3" json:"samples"` } func (m *Vector) Reset() { *m = Vector{} } @@ -870,7 +870,7 @@ func (m *Vector) XXX_DiscardUnknown() { var xxx_messageInfo_Vector proto.InternalMessageInfo -func (m *Vector) GetSamples() []*Sample { +func (m *Vector) GetSamples() []Sample { if m != nil { return m.Samples } @@ -994,83 +994,83 @@ func init() { func init() { proto.RegisterFile("query.proto", fileDescriptor_5c6ac9b241082464) } var fileDescriptor_5c6ac9b241082464 = []byte{ - // 1207 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x56, 0x4b, 0x6f, 0x1b, 0xd5, - 0x17, 0xf7, 0xf8, 0x31, 0x76, 0x8e, 0xd3, 0xa4, 0xff, 0x9b, 0x3e, 0x9c, 0xfe, 0xcb, 0x8c, 0x19, - 0x81, 0x14, 0x04, 0x75, 0x44, 0x10, 0x54, 0x80, 0x54, 0xd1, 0x81, 0x42, 0x5a, 0x28, 0x6d, 0x6f, - 0xaa, 0x22, 0xb1, 0xa9, 0xae, 0xed, 0x5b, 0x67, 0x88, 0xe7, 0xd1, 0x3b, 0x77, 0x9a, 0x98, 0x15, - 0x6b, 0x16, 0x88, 0x35, 0x12, 0x0b, 0x76, 0x2c, 0xf8, 0x20, 0x59, 0x76, 0x59, 0x21, 0x31, 0x22, - 0xce, 0x06, 0xcd, 0xaa, 0x1f, 0x01, 0xdd, 0xc7, 0xd8, 0xe3, 0xc4, 0x49, 0xd4, 0x15, 0x1b, 0x67, - 0xce, 0x39, 0xbf, 0xf3, 0xbc, 0xe7, 0x11, 0x68, 0x3e, 0x4d, 0x28, 0x1b, 0x75, 0x22, 0x16, 0xf2, - 0x10, 0x35, 0x39, 0xf3, 0xa2, 0x88, 0xb2, 0x5d, 0xc2, 0xe8, 0x95, 0x0b, 0x83, 0x70, 0x10, 0x4a, - 0xfe, 0xba, 0xf8, 0x52, 0x90, 0x2b, 0xd6, 0x20, 0x0c, 0x07, 0x43, 0xba, 0x2e, 0xa9, 0x6e, 0xf2, - 0x64, 0xbd, 0x9f, 0x30, 0xc2, 0xbd, 0x30, 0xd0, 0xf2, 0xd5, 0xa3, 0x72, 0x12, 0x68, 0xeb, 0x57, - 0x3e, 0x1c, 0x78, 0x7c, 0x3b, 0xe9, 0x76, 0x7a, 0xa1, 0xbf, 0xde, 0x0b, 0x19, 0xa7, 0x7b, 0x11, - 0x0b, 0xbf, 0xa3, 0x3d, 0xae, 0xa9, 0xf5, 0x68, 0x67, 0x90, 0x0b, 0xba, 0xfa, 0x43, 0xa9, 0x3a, - 0x3f, 0x56, 0x00, 0xdd, 0x67, 0xa1, 0x4f, 0xf9, 0x36, 0x4d, 0x62, 0x4c, 0xe3, 0x28, 0x0c, 0x62, - 0x8a, 0x1c, 0x30, 0xb7, 0x38, 0xe1, 0x49, 0xdc, 0x32, 0xda, 0xc6, 0xda, 0x82, 0x0b, 0x59, 0x6a, - 0x9b, 0xb1, 0xe4, 0x60, 0x2d, 0x41, 0x5f, 0x40, 0xf5, 0x33, 0xc2, 0x49, 0xab, 0xdc, 0x36, 0xd6, - 0x9a, 0x1b, 0xff, 0xef, 0x14, 0x52, 0xec, 0x4c, 0x4d, 0x0a, 0x88, 0x7b, 0x69, 0x3f, 0xb5, 0x4b, - 0x59, 0x6a, 0x2f, 0xf5, 0x09, 0x27, 0xef, 0x84, 0xbe, 0xc7, 0xa9, 0x1f, 0xf1, 0x11, 0x96, 0x06, - 0xd0, 0xfb, 0xb0, 0x70, 0x8b, 0xb1, 0x90, 0x3d, 0x1c, 0x45, 0xb4, 0x55, 0x91, 0xfe, 0x2e, 0x67, - 0xa9, 0xbd, 0x42, 0x73, 0x66, 0x41, 0x63, 0x8a, 0x44, 0x6f, 0x41, 0x4d, 0x12, 0xad, 0xaa, 0x54, - 0x59, 0xc9, 0x52, 0x7b, 0x59, 0xaa, 0x14, 0xe0, 0x0a, 0x81, 0x3e, 0x87, 0xfa, 0x26, 0x25, 0x7d, - 0xca, 0xe2, 0x56, 0xad, 0x5d, 0x59, 0x6b, 0x6e, 0xbc, 0x79, 0x42, 0xb4, 0x79, 0x01, 0x14, 0xda, - 0xad, 0x65, 0xa9, 0x6d, 0x5c, 0xc3, 0xb9, 0x32, 0xda, 0x80, 0xc6, 0x37, 0x84, 0x05, 0x5e, 0x30, - 0x88, 0x5b, 0x66, 0xbb, 0xb2, 0xb6, 0xe0, 0x5e, 0xca, 0x52, 0x1b, 0xed, 0x6a, 0x5e, 0xc1, 0xf1, - 0x04, 0x27, 0xc2, 0xbc, 0x1d, 0x3c, 0x09, 0xe3, 0x56, 0x5d, 0x2a, 0xc8, 0x30, 0x3d, 0xc1, 0x28, - 0x86, 0x29, 0x11, 0xce, 0x5f, 0x06, 0x2c, 0xcd, 0x56, 0x0e, 0x75, 0x00, 0x30, 0x8d, 0x93, 0x21, - 0x97, 0xc5, 0x51, 0x8f, 0xb1, 0x94, 0xa5, 0x36, 0xb0, 0x09, 0x17, 0x17, 0x10, 0xe8, 0x0e, 0x98, - 0x8a, 0xd2, 0xcf, 0xe2, 0x9c, 0x90, 0xe8, 0x03, 0xd1, 0x9c, 0x0a, 0xe9, 0x2e, 0xe9, 0xd7, 0x31, - 0x95, 0x4d, 0xac, 0x2d, 0xa0, 0x7b, 0x50, 0x13, 0x4f, 0x1e, 0xcb, 0x37, 0x69, 0x6e, 0xbc, 0x71, - 0x46, 0xcd, 0x44, 0x5b, 0xc4, 0x2a, 0x3f, 0xa9, 0x56, 0xcc, 0x4f, 0x32, 0x9c, 0x1d, 0x58, 0xfa, - 0x94, 0xf4, 0xb6, 0x69, 0x7f, 0xd2, 0x67, 0xab, 0x50, 0xd9, 0xa1, 0x23, 0x9d, 0x57, 0x3d, 0x4b, - 0x6d, 0x41, 0x62, 0xf1, 0x83, 0x6e, 0x40, 0x9d, 0xee, 0x71, 0x1a, 0xf0, 0xb8, 0x55, 0x96, 0x6f, - 0xb6, 0x32, 0xe3, 0xff, 0x96, 0x94, 0xb9, 0xcb, 0x3a, 0xf6, 0x1c, 0x8b, 0xf3, 0x0f, 0xe7, 0x0f, - 0x03, 0x4c, 0x05, 0x42, 0xb6, 0x4c, 0x84, 0x71, 0xe9, 0xa7, 0xe2, 0x2e, 0x64, 0xa9, 0xad, 0x18, - 0x58, 0xfd, 0x11, 0x61, 0xd0, 0xa0, 0x2f, 0x4b, 0x56, 0x51, 0x61, 0xd0, 0xa0, 0x8f, 0xc5, 0x0f, - 0x6a, 0x43, 0x83, 0x33, 0xd2, 0xa3, 0x8f, 0xbd, 0xbe, 0x6e, 0xb4, 0xbc, 0x29, 0x24, 0xfb, 0x76, - 0x1f, 0xdd, 0x80, 0x06, 0xd3, 0xf9, 0xb4, 0x6a, 0xb2, 0x52, 0x17, 0x3a, 0x6a, 0x56, 0x3b, 0xf9, - 0xac, 0x76, 0x6e, 0x06, 0x23, 0x77, 0x31, 0x4b, 0xed, 0x09, 0x12, 0x4f, 0xbe, 0xee, 0x54, 0x1b, - 0x95, 0xf3, 0x55, 0xe7, 0x97, 0x32, 0x2c, 0x6e, 0x11, 0x3f, 0x1a, 0xd2, 0x2d, 0xce, 0x28, 0xf1, - 0xd1, 0x1e, 0x98, 0x43, 0xd2, 0xa5, 0x43, 0x31, 0x82, 0x2a, 0xfd, 0x7c, 0x82, 0x3b, 0x5f, 0x09, - 0xfe, 0x7d, 0xe2, 0x31, 0xf7, 0x4b, 0x91, 0xfe, 0x9f, 0xa9, 0xfd, 0x4a, 0x1b, 0x40, 0xe9, 0xdf, - 0xec, 0x93, 0x88, 0x53, 0x26, 0xde, 0xdd, 0xa7, 0x9c, 0x79, 0x3d, 0xac, 0xfd, 0xa1, 0x8f, 0xa0, - 0x1e, 0xcb, 0x48, 0xf2, 0xca, 0x9f, 0x9f, 0xba, 0x56, 0x21, 0x4e, 0x5b, 0xe6, 0x19, 0x19, 0x26, - 0x34, 0xc6, 0xb9, 0x02, 0x7a, 0x08, 0xb0, 0xed, 0xc5, 0x3c, 0x1c, 0x30, 0xe2, 0x8b, 0xc6, 0x11, - 0xea, 0xed, 0x99, 0x87, 0x53, 0x16, 0x36, 0x73, 0x90, 0x4c, 0x03, 0x69, 0x73, 0x05, 0x5d, 0x5c, - 0xf8, 0x76, 0xbe, 0x87, 0x95, 0x39, 0x6a, 0xe8, 0x75, 0x58, 0xe4, 0x9e, 0x4f, 0x63, 0x4e, 0xfc, - 0xe8, 0xb1, 0xaf, 0x76, 0x55, 0x05, 0x37, 0x27, 0xbc, 0xbb, 0x31, 0xfa, 0x04, 0x16, 0x26, 0x76, - 0xf4, 0x48, 0x5c, 0x3d, 0x2d, 0x1c, 0xb7, 0x2a, 0x42, 0xc1, 0x53, 0x25, 0xe7, 0x29, 0x2c, 0x1f, - 0xc1, 0xa0, 0x0b, 0x50, 0xeb, 0x85, 0x49, 0xa0, 0xfa, 0xc9, 0xc0, 0x8a, 0x40, 0xe7, 0xa1, 0x12, - 0x27, 0xca, 0x89, 0x81, 0xc5, 0x27, 0xfa, 0x00, 0xea, 0xdd, 0xa4, 0xb7, 0x43, 0x79, 0x5e, 0x89, - 0x59, 0xd7, 0x53, 0xa7, 0x12, 0x84, 0x73, 0xb0, 0x13, 0xc3, 0xf2, 0x11, 0x19, 0xb2, 0x00, 0xba, - 0x61, 0x12, 0xf4, 0x09, 0xf3, 0xa8, 0x4a, 0xb4, 0x86, 0x0b, 0x1c, 0x11, 0xd2, 0x30, 0xdc, 0xa5, - 0x4c, 0xbb, 0x57, 0x84, 0xe0, 0x26, 0xc2, 0x9d, 0x9c, 0x60, 0x03, 0x2b, 0x62, 0x1a, 0x7e, 0xb5, - 0x10, 0xbe, 0xe3, 0xc3, 0xe5, 0x13, 0x66, 0x1a, 0xe1, 0x69, 0x43, 0x18, 0xb2, 0x84, 0x6f, 0x9f, - 0xb5, 0x0a, 0x14, 0x5a, 0x6d, 0x84, 0xa6, 0x18, 0x4f, 0xad, 0x3f, 0x69, 0x14, 0x67, 0xbf, 0x0c, - 0xd6, 0xe9, 0x8a, 0xe8, 0x1e, 0x5c, 0xe4, 0x21, 0x27, 0x43, 0xb9, 0xab, 0x48, 0x77, 0x98, 0x4b, - 0xf5, 0x18, 0xaf, 0x66, 0xa9, 0x3d, 0x1f, 0x80, 0xe7, 0xb3, 0xd1, 0x6f, 0x06, 0x5c, 0x9d, 0x2b, - 0xb9, 0x4f, 0xd9, 0x16, 0xa7, 0x91, 0x6e, 0xf7, 0x8f, 0xcf, 0xc8, 0xee, 0xa8, 0xb6, 0x8c, 0x56, - 0x9b, 0x70, 0xdb, 0x59, 0x6a, 0x9f, 0xea, 0x04, 0x9f, 0x2a, 0x45, 0xef, 0x42, 0x33, 0xa2, 0x64, - 0x27, 0x4f, 0xb5, 0x22, 0x53, 0x5d, 0xce, 0x52, 0xbb, 0xc8, 0xc6, 0x45, 0xc2, 0xf1, 0xe0, 0x15, - 0x83, 0x14, 0x1d, 0x20, 0x07, 0x57, 0x4f, 0x8c, 0x22, 0x8e, 0x8d, 0x53, 0xf9, 0xd8, 0x38, 0x39, - 0x0f, 0xa1, 0x75, 0xd2, 0xb1, 0x44, 0xab, 0x50, 0xfd, 0x9a, 0xf8, 0xf9, 0x91, 0xd2, 0x5b, 0x52, - 0xb2, 0xd0, 0x6b, 0x60, 0x3e, 0x92, 0x8b, 0x42, 0x56, 0x78, 0x22, 0xd4, 0x4c, 0xe7, 0x57, 0x03, - 0x2e, 0xce, 0x3d, 0x4d, 0xe8, 0x1a, 0x98, 0xcf, 0x68, 0x8f, 0x87, 0x4c, 0x37, 0xde, 0xec, 0x0d, - 0x78, 0x24, 0x45, 0x9b, 0x25, 0xac, 0x41, 0xe8, 0x2a, 0x34, 0x18, 0xd9, 0x75, 0x47, 0x9c, 0xaa, - 0xe8, 0x17, 0x37, 0x4b, 0x78, 0xc2, 0x11, 0xc6, 0x7c, 0xc2, 0x99, 0xb7, 0xa7, 0x0f, 0xda, 0xac, - 0xb1, 0xbb, 0x52, 0x24, 0x8c, 0x29, 0x90, 0xdb, 0x00, 0x7d, 0x10, 0x9d, 0xeb, 0x60, 0x2a, 0x57, - 0xe8, 0x5a, 0x71, 0x12, 0x8e, 0x1f, 0x25, 0x55, 0xeb, 0x69, 0x93, 0xff, 0x54, 0x06, 0x53, 0xf1, - 0xfe, 0xc3, 0x75, 0x7e, 0x1d, 0x4c, 0x15, 0x8f, 0xde, 0x7f, 0xc7, 0xb7, 0xf9, 0xb9, 0xfd, 0xd4, - 0x36, 0xc4, 0x51, 0x94, 0x7d, 0x80, 0x35, 0x1c, 0x3d, 0x28, 0xee, 0x4e, 0x55, 0xb2, 0xb3, 0x57, - 0xf9, 0xff, 0xb4, 0xad, 0xa9, 0x6a, 0x71, 0x99, 0xde, 0x03, 0x53, 0xd5, 0x19, 0xdd, 0x82, 0x73, - 0x71, 0xe1, 0xdc, 0xe5, 0x65, 0x59, 0x9d, 0xe3, 0x40, 0x21, 0xf4, 0x66, 0x9e, 0xd5, 0x72, 0x6f, - 0x3e, 0x3f, 0xb0, 0x4a, 0x2f, 0x0e, 0xac, 0xd2, 0xcb, 0x03, 0xcb, 0xf8, 0x61, 0x6c, 0x19, 0xbf, - 0x8f, 0x2d, 0x63, 0x7f, 0x6c, 0x19, 0xcf, 0xc7, 0x96, 0xf1, 0xf7, 0xd8, 0x32, 0xfe, 0x19, 0x5b, - 0xa5, 0x97, 0x63, 0xcb, 0xf8, 0xf9, 0xd0, 0x2a, 0x3d, 0x3f, 0xb4, 0x4a, 0x2f, 0x0e, 0xad, 0xd2, - 0xb7, 0xc5, 0x7f, 0xc7, 0xbb, 0xa6, 0xbc, 0xd2, 0xef, 0xfd, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x31, - 0xdc, 0x8d, 0x85, 0xb1, 0x0b, 0x00, 0x00, + // 1208 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x56, 0x4b, 0x6f, 0x1b, 0x55, + 0x14, 0xf6, 0xf8, 0x31, 0x76, 0x8e, 0xd3, 0xa4, 0xdc, 0xf4, 0xe1, 0x94, 0x32, 0x63, 0x46, 0x20, + 0x05, 0x41, 0x1d, 0x91, 0x0a, 0x10, 0x20, 0x2a, 0x3a, 0x50, 0x48, 0x0b, 0xa5, 0xed, 0x4d, 0x55, + 0x24, 0x36, 0xd5, 0xb5, 0x7d, 0xeb, 0x0c, 0xf1, 0x3c, 0x7a, 0xe7, 0x4e, 0x13, 0xb3, 0x62, 0xcd, + 0x02, 0xb1, 0x46, 0x62, 0xc1, 0x8e, 0x05, 0x3f, 0x24, 0xcb, 0x2e, 0x2b, 0x24, 0x46, 0xd4, 0xd9, + 0xa0, 0x59, 0xf5, 0x27, 0xa0, 0xfb, 0x18, 0x7b, 0x9c, 0x38, 0x89, 0xba, 0x62, 0xe3, 0xcc, 0x39, + 0xe7, 0x3b, 0xcf, 0x7b, 0x1e, 0x81, 0xe6, 0xe3, 0x84, 0xb2, 0x51, 0x27, 0x62, 0x21, 0x0f, 0x51, + 0x93, 0x33, 0x2f, 0x8a, 0x28, 0xdb, 0x25, 0x8c, 0x5e, 0x3a, 0x37, 0x08, 0x07, 0xa1, 0xe4, 0xaf, + 0x8b, 0x2f, 0x05, 0xb9, 0x64, 0x0d, 0xc2, 0x70, 0x30, 0xa4, 0xeb, 0x92, 0xea, 0x26, 0x8f, 0xd6, + 0xfb, 0x09, 0x23, 0xdc, 0x0b, 0x03, 0x2d, 0x5f, 0x3d, 0x2c, 0x27, 0x81, 0xb6, 0x7e, 0xe9, 0xc3, + 0x81, 0xc7, 0xb7, 0x93, 0x6e, 0xa7, 0x17, 0xfa, 0xeb, 0xbd, 0x90, 0x71, 0xba, 0x17, 0xb1, 0xf0, + 0x7b, 0xda, 0xe3, 0x9a, 0x5a, 0x8f, 0x76, 0x06, 0xb9, 0xa0, 0xab, 0x3f, 0x94, 0xaa, 0xf3, 0x53, + 0x05, 0xd0, 0x5d, 0x16, 0xfa, 0x94, 0x6f, 0xd3, 0x24, 0xc6, 0x34, 0x8e, 0xc2, 0x20, 0xa6, 0xc8, + 0x01, 0x73, 0x8b, 0x13, 0x9e, 0xc4, 0x2d, 0xa3, 0x6d, 0xac, 0x2d, 0xb8, 0x90, 0xa5, 0xb6, 0x19, + 0x4b, 0x0e, 0xd6, 0x12, 0xf4, 0x25, 0x54, 0x3f, 0x27, 0x9c, 0xb4, 0xca, 0x6d, 0x63, 0xad, 0xb9, + 0xf1, 0x6a, 0xa7, 0x90, 0x62, 0x67, 0x6a, 0x52, 0x40, 0xdc, 0x0b, 0xfb, 0xa9, 0x5d, 0xca, 0x52, + 0x7b, 0xa9, 0x4f, 0x38, 0x79, 0x27, 0xf4, 0x3d, 0x4e, 0xfd, 0x88, 0x8f, 0xb0, 0x34, 0x80, 0xde, + 0x83, 0x85, 0x1b, 0x8c, 0x85, 0xec, 0xfe, 0x28, 0xa2, 0xad, 0x8a, 0xf4, 0x77, 0x31, 0x4b, 0xed, + 0x15, 0x9a, 0x33, 0x0b, 0x1a, 0x53, 0x24, 0x7a, 0x0b, 0x6a, 0x92, 0x68, 0x55, 0xa5, 0xca, 0x4a, + 0x96, 0xda, 0xcb, 0x52, 0xa5, 0x00, 0x57, 0x08, 0xf4, 0x05, 0xd4, 0x37, 0x29, 0xe9, 0x53, 0x16, + 0xb7, 0x6a, 0xed, 0xca, 0x5a, 0x73, 0xe3, 0xcd, 0x63, 0xa2, 0xcd, 0x0b, 0xa0, 0xd0, 0x6e, 0x2d, + 0x4b, 0x6d, 0xe3, 0x0a, 0xce, 0x95, 0xd1, 0x06, 0x34, 0xbe, 0x25, 0x2c, 0xf0, 0x82, 0x41, 0xdc, + 0x32, 0xdb, 0x95, 0xb5, 0x05, 0xf7, 0x42, 0x96, 0xda, 0x68, 0x57, 0xf3, 0x0a, 0x8e, 0x27, 0x38, + 0x11, 0xe6, 0xcd, 0xe0, 0x51, 0x18, 0xb7, 0xea, 0x52, 0x41, 0x86, 0xe9, 0x09, 0x46, 0x31, 0x4c, + 0x89, 0x70, 0xfe, 0x36, 0x60, 0x69, 0xb6, 0x72, 0xa8, 0x03, 0x80, 0x69, 0x9c, 0x0c, 0xb9, 0x2c, + 0x8e, 0x7a, 0x8c, 0xa5, 0x2c, 0xb5, 0x81, 0x4d, 0xb8, 0xb8, 0x80, 0x40, 0xb7, 0xc0, 0x54, 0x94, + 0x7e, 0x16, 0xe7, 0x98, 0x44, 0xef, 0x89, 0xe6, 0x54, 0x48, 0x77, 0x49, 0xbf, 0x8e, 0xa9, 0x6c, + 0x62, 0x6d, 0x01, 0xdd, 0x81, 0x9a, 0x78, 0xf2, 0x58, 0xbe, 0x49, 0x73, 0xe3, 0x8d, 0x53, 0x6a, + 0x26, 0xda, 0x22, 0x56, 0xf9, 0x49, 0xb5, 0x62, 0x7e, 0x92, 0xe1, 0xec, 0xc0, 0xd2, 0x67, 0xa4, + 0xb7, 0x4d, 0xfb, 0x93, 0x3e, 0x5b, 0x85, 0xca, 0x0e, 0x1d, 0xe9, 0xbc, 0xea, 0x59, 0x6a, 0x0b, + 0x12, 0x8b, 0x1f, 0x74, 0x0d, 0xea, 0x74, 0x8f, 0xd3, 0x80, 0xc7, 0xad, 0xb2, 0x7c, 0xb3, 0x95, + 0x19, 0xff, 0x37, 0xa4, 0xcc, 0x5d, 0xd6, 0xb1, 0xe7, 0x58, 0x9c, 0x7f, 0x38, 0x7f, 0x1a, 0x60, + 0x2a, 0x10, 0xb2, 0x65, 0x22, 0x8c, 0x4b, 0x3f, 0x15, 0x77, 0x21, 0x4b, 0x6d, 0xc5, 0xc0, 0xea, + 0x8f, 0x08, 0x83, 0x06, 0x7d, 0x59, 0xb2, 0x8a, 0x0a, 0x83, 0x06, 0x7d, 0x2c, 0x7e, 0x50, 0x1b, + 0x1a, 0x9c, 0x91, 0x1e, 0x7d, 0xe8, 0xf5, 0x75, 0xa3, 0xe5, 0x4d, 0x21, 0xd9, 0x37, 0xfb, 0xe8, + 0x1a, 0x34, 0x98, 0xce, 0xa7, 0x55, 0x93, 0x95, 0x3a, 0xd7, 0x51, 0xb3, 0xda, 0xc9, 0x67, 0xb5, + 0x73, 0x3d, 0x18, 0xb9, 0x8b, 0x59, 0x6a, 0x4f, 0x90, 0x78, 0xf2, 0x75, 0xab, 0xda, 0xa8, 0x9c, + 0xad, 0x3a, 0xbf, 0x96, 0x61, 0x71, 0x8b, 0xf8, 0xd1, 0x90, 0x6e, 0x71, 0x46, 0x89, 0x8f, 0xf6, + 0xc0, 0x1c, 0x92, 0x2e, 0x1d, 0x8a, 0x11, 0x54, 0xe9, 0xe7, 0x13, 0xdc, 0xf9, 0x5a, 0xf0, 0xef, + 0x12, 0x8f, 0xb9, 0x5f, 0x89, 0xf4, 0xff, 0x4a, 0xed, 0x97, 0xda, 0x00, 0x4a, 0xff, 0x7a, 0x9f, + 0x44, 0x9c, 0x32, 0xf1, 0xee, 0x3e, 0xe5, 0xcc, 0xeb, 0x61, 0xed, 0x0f, 0x7d, 0x04, 0xf5, 0x58, + 0x46, 0x92, 0x57, 0xfe, 0xec, 0xd4, 0xb5, 0x0a, 0x71, 0xda, 0x32, 0x4f, 0xc8, 0x30, 0xa1, 0x31, + 0xce, 0x15, 0xd0, 0x7d, 0x80, 0x6d, 0x2f, 0xe6, 0xe1, 0x80, 0x11, 0x5f, 0x34, 0x8e, 0x50, 0x6f, + 0xcf, 0x3c, 0x9c, 0xb2, 0xb0, 0x99, 0x83, 0x64, 0x1a, 0x48, 0x9b, 0x2b, 0xe8, 0xe2, 0xc2, 0xb7, + 0xf3, 0x03, 0xac, 0xcc, 0x51, 0x43, 0xaf, 0xc3, 0x22, 0xf7, 0x7c, 0x1a, 0x73, 0xe2, 0x47, 0x0f, + 0x7d, 0xb5, 0xab, 0x2a, 0xb8, 0x39, 0xe1, 0xdd, 0x8e, 0xd1, 0xa7, 0xb0, 0x30, 0xb1, 0xa3, 0x47, + 0xe2, 0xf2, 0x49, 0xe1, 0xb8, 0x55, 0x11, 0x0a, 0x9e, 0x2a, 0x39, 0x8f, 0x61, 0xf9, 0x10, 0x06, + 0x9d, 0x83, 0x5a, 0x2f, 0x4c, 0x02, 0xd5, 0x4f, 0x06, 0x56, 0x04, 0x3a, 0x0b, 0x95, 0x38, 0x51, + 0x4e, 0x0c, 0x2c, 0x3e, 0xd1, 0xfb, 0x50, 0xef, 0x26, 0xbd, 0x1d, 0xca, 0xf3, 0x4a, 0xcc, 0xba, + 0x9e, 0x3a, 0x95, 0x20, 0x9c, 0x83, 0x9d, 0x18, 0x96, 0x0f, 0xc9, 0x90, 0x05, 0xd0, 0x0d, 0x93, + 0xa0, 0x4f, 0x98, 0x47, 0x55, 0xa2, 0x35, 0x5c, 0xe0, 0x88, 0x90, 0x86, 0xe1, 0x2e, 0x65, 0xda, + 0xbd, 0x22, 0x04, 0x37, 0x11, 0xee, 0xe4, 0x04, 0x1b, 0x58, 0x11, 0xd3, 0xf0, 0xab, 0x85, 0xf0, + 0x1d, 0x1f, 0x2e, 0x1e, 0x33, 0xd3, 0x08, 0x4f, 0x1b, 0xc2, 0x90, 0x25, 0x7c, 0xfb, 0xb4, 0x55, + 0xa0, 0xd0, 0x6a, 0x23, 0x34, 0xc5, 0x78, 0x6a, 0xfd, 0x49, 0xa3, 0x38, 0xfb, 0x65, 0xb0, 0x4e, + 0x56, 0x44, 0x77, 0xe0, 0x3c, 0x0f, 0x39, 0x19, 0xca, 0x5d, 0x45, 0xba, 0xc3, 0x5c, 0xaa, 0xc7, + 0x78, 0x35, 0x4b, 0xed, 0xf9, 0x00, 0x3c, 0x9f, 0x8d, 0x7e, 0x37, 0xe0, 0xf2, 0x5c, 0xc9, 0x5d, + 0xca, 0xb6, 0x38, 0x8d, 0x74, 0xbb, 0x7f, 0x7c, 0x4a, 0x76, 0x87, 0xb5, 0x65, 0xb4, 0xda, 0x84, + 0xdb, 0xce, 0x52, 0xfb, 0x44, 0x27, 0xf8, 0x44, 0x29, 0x7a, 0x17, 0x9a, 0x11, 0x25, 0x3b, 0x79, + 0xaa, 0x15, 0x99, 0xea, 0x72, 0x96, 0xda, 0x45, 0x36, 0x2e, 0x12, 0x8e, 0x07, 0x2f, 0x19, 0xa4, + 0xe8, 0x00, 0x39, 0xb8, 0x7a, 0x62, 0x14, 0x71, 0x64, 0x9c, 0xca, 0x47, 0xc6, 0xc9, 0xb9, 0x0f, + 0xad, 0xe3, 0x8e, 0x25, 0x5a, 0x85, 0xea, 0x37, 0xc4, 0xcf, 0x8f, 0x94, 0xde, 0x92, 0x92, 0x85, + 0x5e, 0x03, 0xf3, 0x81, 0x5c, 0x14, 0xb2, 0xc2, 0x13, 0xa1, 0x66, 0x3a, 0xbf, 0x19, 0x70, 0x7e, + 0xee, 0x69, 0x42, 0x57, 0xc0, 0x7c, 0x42, 0x7b, 0x3c, 0x64, 0xba, 0xf1, 0x66, 0x6f, 0xc0, 0x03, + 0x29, 0xda, 0x2c, 0x61, 0x0d, 0x42, 0x97, 0xa1, 0xc1, 0xc8, 0xae, 0x3b, 0xe2, 0x54, 0x45, 0xbf, + 0xb8, 0x59, 0xc2, 0x13, 0x8e, 0x30, 0xe6, 0x13, 0xce, 0xbc, 0x3d, 0x7d, 0xd0, 0x66, 0x8d, 0xdd, + 0x96, 0x22, 0x61, 0x4c, 0x81, 0xdc, 0x06, 0xe8, 0x83, 0xe8, 0x7c, 0x02, 0xa6, 0x72, 0x85, 0xae, + 0x16, 0x27, 0xe1, 0xe8, 0x51, 0xd2, 0xdb, 0x51, 0xed, 0x90, 0x49, 0xab, 0xff, 0x5c, 0x06, 0x53, + 0x49, 0xfe, 0xc7, 0xa5, 0xfe, 0x01, 0x98, 0x2a, 0x1e, 0xbd, 0x05, 0x8f, 0xee, 0xf4, 0x33, 0xfb, + 0xa9, 0x6d, 0x88, 0xd3, 0x28, 0xbb, 0x01, 0x6b, 0x38, 0xba, 0x57, 0xdc, 0xa0, 0xaa, 0x70, 0xa7, + 0x2f, 0xf4, 0x57, 0xb4, 0xad, 0xa9, 0x6a, 0x71, 0xa5, 0xde, 0x01, 0x53, 0x55, 0x1b, 0xdd, 0x80, + 0x33, 0x71, 0xe1, 0xe8, 0xe5, 0x65, 0x59, 0x9d, 0xe3, 0x40, 0x21, 0x74, 0x6d, 0x67, 0xb5, 0xdc, + 0xeb, 0x4f, 0x9f, 0x5b, 0xa5, 0x67, 0xcf, 0xad, 0xd2, 0x8b, 0xe7, 0x96, 0xf1, 0xe3, 0xd8, 0x32, + 0xfe, 0x18, 0x5b, 0xc6, 0xfe, 0xd8, 0x32, 0x9e, 0x8e, 0x2d, 0xe3, 0x9f, 0xb1, 0x65, 0xfc, 0x3b, + 0xb6, 0x4a, 0x2f, 0xc6, 0x96, 0xf1, 0xcb, 0x81, 0x55, 0x7a, 0x7a, 0x60, 0x95, 0x9e, 0x1d, 0x58, + 0xa5, 0xef, 0x8a, 0xff, 0x94, 0x77, 0x4d, 0x79, 0xab, 0xaf, 0xfe, 0x17, 0x00, 0x00, 0xff, 0xff, + 0xc9, 0x8e, 0x9e, 0x8e, 0xb7, 0x0b, 0x00, 0x00, } func (this *PrometheusResponse) Equal(that interface{}) bool { @@ -1608,7 +1608,7 @@ func (this *Vector) Equal(that interface{}) bool { return false } for i := range this.Samples { - if !this.Samples[i].Equal(that1.Samples[i]) { + if !this.Samples[i].Equal(&that1.Samples[i]) { return false } } @@ -1895,7 +1895,11 @@ func (this *Vector) GoString() string { s := make([]string, 0, 5) s = append(s, "&tripperware.Vector{") if this.Samples != nil { - s = append(s, "Samples: "+fmt.Sprintf("%#v", this.Samples)+",\n") + vs := make([]*Sample, len(this.Samples)) + for i := range vs { + vs[i] = &this.Samples[i] + } + s = append(s, "Samples: "+fmt.Sprintf("%#v", vs)+",\n") } s = append(s, "}") return strings.Join(s, "") @@ -3337,9 +3341,9 @@ func (this *Vector) String() string { if this == nil { return "nil" } - repeatedStringForSamples := "[]*Sample{" + repeatedStringForSamples := "[]Sample{" for _, f := range this.Samples { - repeatedStringForSamples += strings.Replace(f.String(), "Sample", "Sample", 1) + "," + repeatedStringForSamples += strings.Replace(strings.Replace(f.String(), "Sample", "Sample", 1), `&`, ``, 1) + "," } repeatedStringForSamples += "}" s := strings.Join([]string{`&Vector{`, @@ -5205,7 +5209,7 @@ func (m *Vector) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Samples = append(m.Samples, &Sample{}) + m.Samples = append(m.Samples, Sample{}) if err := m.Samples[len(m.Samples)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } diff --git a/pkg/querier/tripperware/query.proto b/pkg/querier/tripperware/query.proto index 2df1b705a1c..013b73022a8 100644 --- a/pkg/querier/tripperware/query.proto +++ b/pkg/querier/tripperware/query.proto @@ -97,7 +97,7 @@ message PrometheusQueryResult { } message Vector { - repeated Sample samples = 1; + repeated Sample samples = 1 [(gogoproto.nullable) = false]; } message Sample { diff --git a/pkg/querier/tripperware/queryrange/marshaling_test.go b/pkg/querier/tripperware/queryrange/marshaling_test.go index ce84635dc1f..4652a0e10bd 100644 --- a/pkg/querier/tripperware/queryrange/marshaling_test.go +++ b/pkg/querier/tripperware/queryrange/marshaling_test.go @@ -8,13 +8,14 @@ import ( "net/http" "testing" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/tripperware" ) -func BenchmarkPrometheusCodec_DecodeResponse(b *testing.B) { +func BenchmarkPrometheusCodec_DecodeResponse_Json(b *testing.B) { const ( numSeries = 1000 numSamplesPerSeries = 1000 @@ -32,6 +33,33 @@ func BenchmarkPrometheusCodec_DecodeResponse(b *testing.B) { for n := 0; n < b.N; n++ { _, err := PrometheusCodec.DecodeResponse(context.Background(), &http.Response{ StatusCode: 200, + Header: http.Header{"Content-Type": []string{tripperware.ApplicationJson}}, + Body: io.NopCloser(bytes.NewReader(encodedRes)), + ContentLength: int64(len(encodedRes)), + }, nil) + require.NoError(b, err) + } +} + +func BenchmarkPrometheusCodec_DecodeResponse_Protobuf(b *testing.B) { + const ( + numSeries = 1000 + numSamplesPerSeries = 1000 + ) + + // Generate a mocked response and marshal it. + res := mockPrometheusResponse(numSeries, numSamplesPerSeries) + encodedRes, err := proto.Marshal(res) + require.NoError(b, err) + b.Log("test prometheus response size:", len(encodedRes)) + + b.ResetTimer() + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + _, err := PrometheusCodec.DecodeResponse(context.Background(), &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{tripperware.ApplicationProtobuf}}, Body: io.NopCloser(bytes.NewReader(encodedRes)), ContentLength: int64(len(encodedRes)), }, nil) @@ -85,7 +113,7 @@ func mockPrometheusResponse(numSeries, numSamplesPerSeries int) *tripperware.Pro return &tripperware.PrometheusResponse{ Status: "success", Data: tripperware.PrometheusData{ - ResultType: "vector", + ResultType: "matrix", Result: tripperware.PrometheusQueryResult{ Result: &tripperware.PrometheusQueryResult_Matrix{ Matrix: &tripperware.Matrix{ diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index ac36e66b0dc..46f1affb48a 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -56,11 +56,28 @@ func (resp *PrometheusResponse) HTTPHeaders() map[string][]string { } type prometheusCodec struct { - sharded bool + tripperware.Codec + sharded bool + compression tripperware.Compression + defaultCodecType tripperware.CodecType } -func NewPrometheusCodec(sharded bool) *prometheusCodec { //nolint:revive - return &prometheusCodec{sharded: sharded} +func NewPrometheusCodec(sharded bool, compressionStr string, defaultCodecTypeStr string) *prometheusCodec { //nolint:revive + compression := tripperware.NonCompression // default + if compressionStr == string(tripperware.GzipCompression) { + compression = tripperware.GzipCompression + } + + defaultCodecType := tripperware.JsonCodecType // default + if defaultCodecTypeStr == string(tripperware.ProtobufCodecType) { + defaultCodecType = tripperware.ProtobufCodecType + } + + return &prometheusCodec{ + sharded: sharded, + compression: compression, + defaultCodecType: defaultCodecType, + } } func (c prometheusCodec) MergeResponse(ctx context.Context, req tripperware.Request, responses ...tripperware.Response) (tripperware.Response, error) { @@ -135,7 +152,7 @@ func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwa return &result, nil } -func (prometheusCodec) EncodeRequest(ctx context.Context, r tripperware.Request) (*http.Request, error) { +func (c prometheusCodec) EncodeRequest(ctx context.Context, r tripperware.Request) (*http.Request, error) { promReq, ok := r.(*tripperware.PrometheusRequest) if !ok { return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format") @@ -159,8 +176,7 @@ func (prometheusCodec) EncodeRequest(ctx context.Context, r tripperware.Request) } } - // Always ask gzip to the querier - h.Set("Accept-Encoding", "gzip") + tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression) req := &http.Request{ Method: "GET", @@ -173,7 +189,7 @@ func (prometheusCodec) EncodeRequest(ctx context.Context, r tripperware.Request) return req.WithContext(ctx), nil } -func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ tripperware.Request) (tripperware.Response, error) { +func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ tripperware.Request) (tripperware.Response, error) { log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck defer log.Finish() @@ -192,10 +208,21 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ t log.LogFields(otlog.Int("bytes", len(buf))) var resp tripperware.PrometheusResponse - if err := json.Unmarshal(buf, &resp); err != nil { + err = tripperware.UnmarshalResponse(r, buf, &resp) + + if err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) } + // protobuf serialization treats empty slices as nil + if resp.Data.ResultType == model.ValMatrix.String() && resp.Data.Result.GetMatrix().SampleStreams == nil { + resp.Data.Result.GetMatrix().SampleStreams = []tripperware.SampleStream{} + } + + if resp.Headers == nil { + resp.Headers = []*tripperware.PrometheusResponseHeader{} + } + for h, hv := range r.Header { resp.Headers = append(resp.Headers, &tripperware.PrometheusResponseHeader{Name: h, Values: hv}) } @@ -225,7 +252,7 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res tripperware.Respo resp := http.Response{ Header: http.Header{ - "Content-Type": []string{"application/json"}, + "Content-Type": []string{tripperware.ApplicationJson}, }, Body: io.NopCloser(bytes.NewBuffer(b)), StatusCode: http.StatusOK, diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index 6981f5d7d4d..8ba197865f5 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -20,8 +20,8 @@ import ( ) var ( - PrometheusCodec = NewPrometheusCodec(false) - ShardedPrometheusCodec = NewPrometheusCodec(false) + PrometheusCodec = NewPrometheusCodec(false, "", "protobuf") + ShardedPrometheusCodec = NewPrometheusCodec(false, "", "protobuf") ) func TestRoundTrip(t *testing.T) { diff --git a/pkg/querier/tripperware/queryrange/query_range_test.go b/pkg/querier/tripperware/queryrange/query_range_test.go index f603e194946..f69457d2099 100644 --- a/pkg/querier/tripperware/queryrange/query_range_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_test.go @@ -4,12 +4,13 @@ import ( "bytes" "compress/gzip" "context" - "fmt" "io" "net/http" "strconv" "testing" + "github.com/gogo/protobuf/proto" + "github.com/prometheus/common/model" jsoniter "github.com/json-iterator/go" @@ -93,42 +94,193 @@ func TestResponse(t *testing.T) { r := *parsedResponse rWithWarnings := *parsedResponseWithWarnings rWithInfos := *parsedResponseWithInfos - r.Headers = respHeaders - rWithWarnings.Headers = respHeaders - rWithInfos.Headers = respHeaders - for i, tc := range []struct { - body string - expected *tripperware.PrometheusResponse + testCases := []struct { + promBody *tripperware.PrometheusResponse + jsonBody string expectedDecodeErr error cancelCtxBeforeDecode bool + isProtobuf bool }{ { - body: responseBody, - expected: &r, + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 137, TimestampMs: 1536673680000}, + {Value: 137, TimestampMs: 1536673780000}, + }, + }, + }, + }, + }, + }, + }, + }, + jsonBody: responseBody, + isProtobuf: true, }, { - body: responseBodyWithWarnings, - expected: &rWithWarnings, + promBody: &r, + jsonBody: responseBody, + isProtobuf: false, + }, + { + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Warnings: []string{"test-warn"}, + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 137, TimestampMs: 1536673680000}, + {Value: 137, TimestampMs: 1536673780000}, + }, + }, + }, + }, + }, + }, + }, + }, + jsonBody: responseBodyWithWarnings, + isProtobuf: true, }, { - body: responseBodyWithInfos, - expected: &rWithInfos, + promBody: &rWithWarnings, + jsonBody: responseBodyWithWarnings, + isProtobuf: false, }, { - body: responseBody, + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Infos: []string{"PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: \"go_gc_gogc_percent\" (1:6)"}, + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 137, TimestampMs: 1536673680000}, + {Value: 137, TimestampMs: 1536673780000}, + }, + }, + }, + }, + }, + }, + }, + }, + jsonBody: responseBodyWithInfos, + isProtobuf: true, + }, + { + promBody: &rWithInfos, + jsonBody: responseBodyWithInfos, + isProtobuf: false, + }, + { + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 137, TimestampMs: 1536673680000}, + {Value: 137, TimestampMs: 1536673780000}, + }, + }, + }, + }, + }, + }, + }, + }, cancelCtxBeforeDecode: true, expectedDecodeErr: context.Canceled, + isProtobuf: true, }, - } { + { + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 137, TimestampMs: 1536673680000}, + {Value: 137, TimestampMs: 1536673780000}, + }, + }, + }, + }, + }, + }, + }, + }, + cancelCtxBeforeDecode: true, + expectedDecodeErr: context.Canceled, + isProtobuf: false, + }, + } + for i, tc := range testCases { tc := tc t.Run(strconv.Itoa(i), func(t *testing.T) { t.Parallel() + protobuf, err := proto.Marshal(tc.promBody) + require.NoError(t, err) ctx, cancelCtx := context.WithCancel(context.Background()) - response := &http.Response{ - StatusCode: 200, - Header: http.Header{"Content-Type": []string{"application/json"}}, - Body: io.NopCloser(bytes.NewBuffer([]byte(tc.body))), + + var response *http.Response + if tc.isProtobuf { + response = &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{tripperware.ApplicationProtobuf}}, + Body: io.NopCloser(bytes.NewBuffer(protobuf)), + } + tc.promBody.Headers = respHeadersProtobuf + } else { + response = &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{tripperware.ApplicationJson}}, + Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), + } + tc.promBody.Headers = respHeadersJson } + if tc.cancelCtxBeforeDecode { cancelCtx() } @@ -138,14 +290,15 @@ func TestResponse(t *testing.T) { cancelCtx() return } - assert.Equal(t, tc.expected, resp) + + assert.Equal(t, tc.promBody, resp) // Reset response, as the above call will have consumed the body reader. response = &http.Response{ StatusCode: 200, Header: http.Header{"Content-Type": []string{"application/json"}}, - Body: io.NopCloser(bytes.NewBuffer([]byte(tc.body))), - ContentLength: int64(len(tc.body)), + Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), + ContentLength: int64(len(tc.jsonBody)), } resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp) require.NoError(t, err) @@ -158,12 +311,13 @@ func TestResponse(t *testing.T) { func TestResponseWithStats(t *testing.T) { t.Parallel() for i, tc := range []struct { - body string - expected *tripperware.PrometheusResponse + promBody *tripperware.PrometheusResponse + jsonBody string + isProtobuf bool }{ { - body: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]],"peakSamples":16}}}}`, - expected: &tripperware.PrometheusResponse{ + jsonBody: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]],"peakSamples":16}}}}`, + promBody: &tripperware.PrometheusResponse{ Status: "success", Data: tripperware.PrometheusData{ ResultType: model.ValMatrix.String(), @@ -196,27 +350,79 @@ func TestResponseWithStats(t *testing.T) { }, }, }, + isProtobuf: true, + }, + { + jsonBody: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]],"peakSamples":16}}}}`, + promBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 137, TimestampMs: 1536673680000}, + {Value: 137, TimestampMs: 1536673780000}, + }, + }, + }, + }, + }, + }, + Stats: &tripperware.PrometheusResponseStats{ + Samples: &tripperware.PrometheusResponseSamplesStats{ + TotalQueryableSamples: 10, + TotalQueryableSamplesPerStep: []*tripperware.PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 5, TimestampMs: 1536673680000}, + {Value: 5, TimestampMs: 1536673780000}, + }, + PeakSamples: 16, + }, + }, + }, + }, + isProtobuf: false, }, } { tc := tc t.Run(strconv.Itoa(i), func(t *testing.T) { t.Parallel() - tc.expected.Headers = respHeaders - response := &http.Response{ - StatusCode: 200, - Header: http.Header{"Content-Type": []string{"application/json"}}, - Body: io.NopCloser(bytes.NewBuffer([]byte(tc.body))), + protobuf, err := proto.Marshal(tc.promBody) + require.NoError(t, err) + + var response *http.Response + if tc.isProtobuf { + response = &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{tripperware.ApplicationProtobuf}}, + Body: io.NopCloser(bytes.NewBuffer(protobuf)), + } + tc.promBody.Headers = respHeadersProtobuf + } else { + response = &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{tripperware.ApplicationJson}}, + Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), + } + tc.promBody.Headers = respHeadersJson } + resp, err := PrometheusCodec.DecodeResponse(context.Background(), response, nil) require.NoError(t, err) - assert.Equal(t, tc.expected, resp) + assert.Equal(t, tc.promBody, resp) // Reset response, as the above call will have consumed the body reader. response = &http.Response{ StatusCode: 200, Header: http.Header{"Content-Type": []string{"application/json"}}, - Body: io.NopCloser(bytes.NewBuffer([]byte(tc.body))), - ContentLength: int64(len(tc.body)), + Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), + ContentLength: int64(len(tc.jsonBody)), } resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp) require.NoError(t, err) @@ -986,62 +1192,100 @@ func TestMergeAPIResponses(t *testing.T) { } } -func TestGzippedResponse(t *testing.T) { +func TestCompressedResponse(t *testing.T) { t.Parallel() - for _, tc := range []struct { - body string - status int - err error + for i, tc := range []struct { + compression string + jsonBody string + promBody *tripperware.PrometheusResponse + status int + err error }{ { - body: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[2,"2"],[3,"3"]]}],"stats":{"samples":{"totalQueryableSamples":20,"totalQueryableSamplesPerStep":[[2,2],[3,3]],"peakSamples":10}}}}`, - status: 200, + compression: `gzip`, + promBody: &tripperware.PrometheusResponse{ + Status: StatusSuccess, + Data: tripperware.PrometheusData{ + ResultType: matrix, + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}}, + Samples: []cortexpb.Sample{ + {Value: 2, TimestampMs: 2000}, + {Value: 3, TimestampMs: 3000}, + }, + }, + }, + }, + }, + }, + Stats: &tripperware.PrometheusResponseStats{Samples: &tripperware.PrometheusResponseSamplesStats{ + TotalQueryableSamples: 20, + TotalQueryableSamplesPerStep: []*tripperware.PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 2, TimestampMs: 2000}, + {Value: 3, TimestampMs: 3000}, + }, + PeakSamples: 10, + }}, + }, + Headers: []*tripperware.PrometheusResponseHeader{}, + }, + jsonBody: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[2,"2"],[3,"3"]]}],"stats":{"samples":{"totalQueryableSamples":20,"totalQueryableSamplesPerStep":[[2,2],[3,3]],"peakSamples":10}}}}`, + status: 200, }, { - body: `error generic 400`, - status: 400, - err: httpgrpc.Errorf(400, `error generic 400`), + compression: `gzip`, + jsonBody: `error generic 400`, + status: 400, + err: httpgrpc.Errorf(400, `error generic 400`), }, { - status: 400, - err: httpgrpc.Errorf(400, ""), + compression: `gzip`, + status: 400, + err: httpgrpc.Errorf(400, ""), }, } { - for _, c := range []bool{true, false} { - c := c - t.Run(fmt.Sprintf("compressed %t [%s]", c, tc.body), func(t *testing.T) { - t.Parallel() - h := http.Header{ - "Content-Type": []string{"application/json"}, - } - - responseBody := bytes.NewBuffer([]byte(tc.body)) - if c { - h.Set("Content-Encoding", "gzip") - var buf bytes.Buffer - w := gzip.NewWriter(&buf) - _, err := w.Write([]byte(tc.body)) - require.NoError(t, err) - w.Close() - responseBody = &buf - } + t.Run(strconv.Itoa(i), func(t *testing.T) { + t.Parallel() + h := http.Header{} + var b []byte + if tc.promBody != nil { + protobuf, err := proto.Marshal(tc.promBody) + b = protobuf + require.NoError(t, err) + h.Set("Content-Type", tripperware.ApplicationProtobuf) + tc.promBody.Headers = append(tc.promBody.Headers, &tripperware.PrometheusResponseHeader{Name: "Content-Type", Values: []string{tripperware.ApplicationProtobuf}}) + } else { + b = []byte(tc.jsonBody) + h.Set("Content-Type", tripperware.ApplicationJson) + } - response := &http.Response{ - StatusCode: tc.status, - Header: h, - Body: io.NopCloser(responseBody), - } - r, err := PrometheusCodec.DecodeResponse(context.Background(), response, nil) - require.Equal(t, tc.err, err) + h.Set("Content-Encoding", tc.compression) + if tc.promBody != nil { + tc.promBody.Headers = append(tc.promBody.Headers, &tripperware.PrometheusResponseHeader{Name: "Content-Encoding", Values: []string{"gzip"}}) + } + responseBody := &bytes.Buffer{} + w := gzip.NewWriter(responseBody) + _, err := w.Write(b) + require.NoError(t, err) + w.Close() - if err == nil { - resp, err := json.Marshal(r) - require.NoError(t, err) + response := &http.Response{ + StatusCode: tc.status, + Header: h, + Body: io.NopCloser(responseBody), + } + resp, err := PrometheusCodec.DecodeResponse(context.Background(), response, nil) + require.Equal(t, tc.err, err) - require.Equal(t, tc.body, string(resp)) - } - }) - } + if err == nil { + require.NoError(t, err) + require.Equal(t, tc.promBody.Data, resp.(*tripperware.PrometheusResponse).Data) + } + }) } } diff --git a/pkg/querier/tripperware/queryrange/results_cache_test.go b/pkg/querier/tripperware/queryrange/results_cache_test.go index 3d1e9f95748..51b68531b5f 100644 --- a/pkg/querier/tripperware/queryrange/results_cache_test.go +++ b/pkg/querier/tripperware/queryrange/results_cache_test.go @@ -60,12 +60,20 @@ var ( Query: "sum(container_memory_rss) by (namespace)", CachingOptions: tripperware.CachingOptions{Disabled: true}, } - respHeaders = []*tripperware.PrometheusResponseHeader{ + respHeadersJson = []*tripperware.PrometheusResponseHeader{ { Name: "Content-Type", - Values: []string{"application/json"}, + Values: []string{tripperware.ApplicationJson}, }, } + + respHeadersProtobuf = []*tripperware.PrometheusResponseHeader{ + { + Name: "Content-Type", + Values: []string{tripperware.ApplicationProtobuf}, + }, + } + parsedResponse = &tripperware.PrometheusResponse{ Status: "success", Data: tripperware.PrometheusData{ @@ -114,6 +122,7 @@ var ( }, }, } + parsedResponseWithInfos = &tripperware.PrometheusResponse{ Status: "success", Infos: []string{"PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: \"go_gc_gogc_percent\" (1:6)"},