Skip to content

Add protobuf codec for query range and instant query responses #5527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b1d4793
replace json data conversion with protobuf for querier handler
afhassan Jun 29, 2023
c54f8f5
add stats to PrometheusResponse created from query data
afhassan Jul 12, 2023
cabd489
add conversion from query data to PrometheusInstantQueryResponse in q…
afhassan Jul 12, 2023
b843075
remove endpoints not used by query api and add it as a handler for /q…
afhassan Jul 13, 2023
f1eddd9
add snappy compression
afhassan Jul 13, 2023
855375f
improve code readability in querier handler
afhassan Jul 14, 2023
6884a45
add Query and QueryRange handler functions directly to router
afhassan Jul 18, 2023
f83b731
add scalar and string result types to instant query handler
afhassan Jul 21, 2023
112312b
reuse time parsing functions for querier handler
afhassan Jul 28, 2023
9717483
improve querier handler code readability
afhassan Jul 28, 2023
581847a
reuse thanos api struct definitions for querier handler
afhassan Aug 2, 2023
a6e8e04
return json response for unsharded querier requests
afhassan Aug 9, 2023
45c85a7
remove header copying in codec for querier handler
afhassan Aug 9, 2023
7af52db
change instant query context cancelled unit test to test new querier …
afhassan Aug 21, 2023
ad7e03c
handle empty response errors
afhassan Aug 14, 2023
5d7ada2
add config to specify compression type for querier handler response
afhassan Aug 16, 2023
661b078
change unit tests to work with new querier handler
afhassan Aug 21, 2023
78b12ab
fix time parsing in querier handler
afhassan Aug 23, 2023
a55d23a
add unit tests for querier handler
afhassan Aug 23, 2023
3d35da3
add feature flag for querier handler
afhassan Aug 24, 2023
9f77e36
refactor querier unit tests
afhassan Aug 6, 2024
17f325c
Merge branch 'cortexproject:master' into master
afhassan Aug 6, 2024
1c7397e
add protobuf codec to reuse prometheus handler for querier
afhassan Aug 9, 2024
8c4dafe
Merge remote-tracking branch 'upstream/master'
afhassan Sep 4, 2024
a466d86
refactor to use unified protobuf struct for both range and instant qu…
afhassan Sep 11, 2024
018c931
fix issue with sample pointers
afhassan Sep 11, 2024
dea9806
Merge remote-tracking branch 'upstream/master'
afhassan Sep 13, 2024
0afa97d
fix protobuf tests to expect peakSamples
afhassan Sep 13, 2024
4da5cb0
refactor protobuf config logic
afhassan Sep 17, 2024
a917d28
refactor protobuf codec
afhassan Sep 17, 2024
2f1b315
add config validation
afhassan Sep 18, 2024
6a457e7
remove snappy compression tests
afhassan Sep 18, 2024
7898191
fix linter formatting
afhassan Sep 18, 2024
9b6ef8c
fix protobuf empty slice decoding
afhassan Sep 19, 2024
4901a28
add integration test for protobuf codec
afhassan Sep 20, 2024
0a7d807
add gzip compression to integration test
afhassan Sep 23, 2024
8c9eaf0
Merge remote-tracking branch 'upstream/master'
afhassan Sep 24, 2024
0f6e8bd
fix recompile proto files
afhassan Sep 24, 2024
7730ad2
update config docs
afhassan Sep 24, 2024
5d8658e
add changelog for protobuf codec
afhassan Sep 24, 2024
91e1366
make gzip default compression
afhassan Sep 25, 2024
390e370
add protobuf codec to experimental features
afhassan Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ querier:
# CLI flag: -querier.per-step-stats-enabled
[per_step_stats_enabled: <boolean> | default = false]

# Use compression for metrics query API or instant and range query APIs.
# Supports 'gzip' and '' (disable compression)
# CLI flag: -querier.response-compression
[response_compression: <string> | default = ""]

# The time after which a metric should be queried from storage and not just
# ingesters. 0 means all queries are sent to store. When running the blocks
# storage, if this option is enabled, the time range of the query sent to the
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ api:
# CLI flag: -api.build-info-enabled
[build_info_enabled: <boolean> | default = false]

# Choose default codec for querier response serialization. Supports 'json' and
# 'protobuf'.
# CLI flag: -api.querier-default-codec
[querier_default_codec: <string> | default = "json"]

# The server_config configures the HTTP and gRPC server of the launched
# service(s).
[server: <server_config>]
Expand Down Expand Up @@ -3718,6 +3723,11 @@ The `querier_config` configures the Cortex querier.
# CLI flag: -querier.per-step-stats-enabled
[per_step_stats_enabled: <boolean> | default = false]

# Use compression for metrics query API or instant and range query APIs.
# Supports 'gzip' and '' (disable compression)
# CLI flag: -querier.response-compression
[response_compression: <string> | default = ""]

# The time after which a metric should be queried from storage and not just
# ingesters. 0 means all queries are sent to store. When running the blocks
# storage, if this option is enabled, the time range of the query sent to the
Expand Down
20 changes: 20 additions & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,26 @@ func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) {
})
}

func TestQueryFrontendProtobufCodec(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
testMissingMetricName: false,
querySchedulerEnabled: true,
queryStatsEnabled: true,
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))

minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

flags = mergeFlags(e2e.EmptyFlags(), map[string]string{
"-api.querier-default-codec": "protobuf",
"-querier.response-compression": "gzip",
})
return cortexConfigFile, flags
},
})
}

func TestQueryFrontendRemoteRead(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
remoteReadEnabled: true,
Expand Down
16 changes: 16 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/regexp"
"github.com/klauspost/compress/gzhttp"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/httputil"
Expand Down Expand Up @@ -73,13 +74,20 @@ type Config struct {
corsRegexString string `yaml:"cors_origin"`

buildInfoEnabled bool `yaml:"build_info_enabled"`

QuerierDefaultCodec string `yaml:"querier_default_codec"`
}

var (
errUnsupportedDefaultCodec = errors.New("unsupported default codec type. Supported types are 'json' and 'protobuf'")
)

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ResponseCompression, "api.response-compression-enabled", false, "Use GZIP compression for API responses. Some endpoints serve large YAML or JSON blobs which can benefit from compression.")
f.Var(&cfg.HTTPRequestHeadersToLog, "api.http-request-headers-to-log", "Which HTTP Request headers to add to logs")
f.BoolVar(&cfg.buildInfoEnabled, "api.build-info-enabled", false, "If enabled, build Info API will be served by query frontend or querier.")
f.StringVar(&cfg.QuerierDefaultCodec, "api.querier-default-codec", "json", "Choose default codec for querier response serialization. Supports 'json' and 'protobuf'.")
cfg.RegisterFlagsWithPrefix("", f)
}

Expand All @@ -90,6 +98,14 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.corsRegexString, prefix+"server.cors-origin", ".*", `Regex for CORS origin. It is fully anchored. Example: 'https?://(domain1|domain2)\.com'`)
}

// validate config
func (cfg *Config) Validate() error {
if cfg.QuerierDefaultCodec != "json" && cfg.QuerierDefaultCodec != "protobuf" {
return errUnsupportedDefaultCodec
}
return nil
}

// Push either wraps the distributor push function as configured or returns the distributor push directly.
func (cfg *Config) wrapDistributorPush(d *distributor.Distributor) push.Func {
if cfg.DistributorPushWrapper != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/codec"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
)
Expand Down Expand Up @@ -231,6 +232,9 @@ func NewQuerierHandler(
false,
)

// JSON codec is already installed. Install Protobuf codec to give the option for using either.
api.InstallCodec(codec.ProtobufCodec{})

router := mux.NewRouter()

// Use a separate metric for the querier in order to differentiate requests from the query-frontend when
Expand Down
3 changes: 3 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ func (c *Config) Validate(log log.Logger) error {
return errInvalidHTTPPrefix
}

if err := c.API.Validate(); err != nil {
return errors.Wrap(err, "invalid api config")
}
if err := c.Storage.Validate(); err != nil {
return errors.Wrap(err, "invalid storage config")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,9 @@ func (t *Cortex) initFlusher() (serv services.Service, err error) {
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
queryAnalyzer := querysharding.NewQueryAnalyzer()
// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
prometheusCodec := queryrange.NewPrometheusCodec(false)
prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
Expand Down Expand Up @@ -472,7 +472,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
queryRangeMiddlewares,
instantQueryMiddlewares,
prometheusCodec,
instantquery.InstantQueryCodec,
instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec),
t.Overrides,
queryAnalyzer,
t.Cfg.Querier.DefaultEvaluationInterval,
Expand Down
161 changes: 161 additions & 0 deletions pkg/querier/codec/protobuf_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package codec

import (
"github.com/gogo/protobuf/proto"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/stats"
v1 "github.com/prometheus/prometheus/web/api/v1"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

type ProtobufCodec struct{}

func (p ProtobufCodec) ContentType() v1.MIMEType {
return v1.MIMEType{Type: "application", SubType: "x-protobuf"}
}

func (p ProtobufCodec) CanEncode(resp *v1.Response) bool {
// Errors are parsed by default json codec
if resp.Error != "" || resp.Data == nil {
return false
}
return true
}

func (p ProtobufCodec) Encode(resp *v1.Response) ([]byte, error) {
prometheusQueryResponse, err := createPrometheusQueryResponse(resp)
if err != nil {
return []byte{}, err
}
b, err := proto.Marshal(prometheusQueryResponse)
return b, err
}

func createPrometheusQueryResponse(resp *v1.Response) (*tripperware.PrometheusResponse, error) {
var data = resp.Data.(*v1.QueryData)

var queryResult tripperware.PrometheusQueryResult
switch string(data.ResultType) {
case model.ValMatrix.String():
queryResult.Result = &tripperware.PrometheusQueryResult_Matrix{
Matrix: &tripperware.Matrix{
SampleStreams: *getMatrixSampleStreams(data),
},
}
case model.ValVector.String():
queryResult.Result = &tripperware.PrometheusQueryResult_Vector{
Vector: &tripperware.Vector{
Samples: *getVectorSamples(data),
},
}
default:
json := jsoniter.ConfigCompatibleWithStandardLibrary
rawBytes, err := json.Marshal(data)
if err != nil {
return nil, err
}
queryResult.Result = &tripperware.PrometheusQueryResult_RawBytes{RawBytes: rawBytes}
}

var stats *tripperware.PrometheusResponseStats
if data.Stats != nil {
builtin := data.Stats.Builtin()
stats = &tripperware.PrometheusResponseStats{Samples: getStats(&builtin)}
}

return &tripperware.PrometheusResponse{
Status: string(resp.Status),
Data: tripperware.PrometheusData{
ResultType: string(data.ResultType),
Result: queryResult,
Stats: stats,
},
ErrorType: string(resp.ErrorType),
Error: resp.Error,
Warnings: resp.Warnings,
}, nil
}

func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream {
sampleStreamsLen := len(data.Result.(promql.Matrix))
sampleStreams := make([]tripperware.SampleStream, sampleStreamsLen)

for i := 0; i < sampleStreamsLen; i++ {
labelsLen := len(data.Result.(promql.Matrix)[i].Metric)
var labels []cortexpb.LabelAdapter
if labelsLen > 0 {
labels = make([]cortexpb.LabelAdapter, labelsLen)
for j := 0; j < labelsLen; j++ {
labels[j] = cortexpb.LabelAdapter{
Name: data.Result.(promql.Matrix)[i].Metric[j].Name,
Value: data.Result.(promql.Matrix)[i].Metric[j].Value,
}
}
}

samplesLen := len(data.Result.(promql.Matrix)[i].Floats)
var samples []cortexpb.Sample
if samplesLen > 0 {
samples = make([]cortexpb.Sample, samplesLen)
for j := 0; j < samplesLen; j++ {
samples[j] = cortexpb.Sample{
Value: data.Result.(promql.Matrix)[i].Floats[j].F,
TimestampMs: data.Result.(promql.Matrix)[i].Floats[j].T,
}
}
}
sampleStreams[i] = tripperware.SampleStream{Labels: labels, Samples: samples}
}
return &sampleStreams
}

func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample {
vectorSamplesLen := len(data.Result.(promql.Vector))
vectorSamples := make([]tripperware.Sample, vectorSamplesLen)

for i := 0; i < vectorSamplesLen; i++ {
labelsLen := len(data.Result.(promql.Vector)[i].Metric)
var labels []cortexpb.LabelAdapter
if labelsLen > 0 {
labels = make([]cortexpb.LabelAdapter, labelsLen)
for j := 0; j < labelsLen; j++ {
labels[j] = cortexpb.LabelAdapter{
Name: data.Result.(promql.Vector)[i].Metric[j].Name,
Value: data.Result.(promql.Vector)[i].Metric[j].Value,
}
}
}

vectorSamples[i] = tripperware.Sample{
Labels: labels,
Sample: &cortexpb.Sample{
TimestampMs: data.Result.(promql.Vector)[i].T,
Value: data.Result.(promql.Vector)[i].F,
},
}
}
return &vectorSamples
}

func getStats(builtin *stats.BuiltinStats) *tripperware.PrometheusResponseSamplesStats {
queryableSamplesStatsPerStepLen := len(builtin.Samples.TotalQueryableSamplesPerStep)
queryableSamplesStatsPerStep := make([]*tripperware.PrometheusResponseQueryableSamplesStatsPerStep, queryableSamplesStatsPerStepLen)
for i := 0; i < queryableSamplesStatsPerStepLen; i++ {
queryableSamplesStatsPerStep[i] = &tripperware.PrometheusResponseQueryableSamplesStatsPerStep{
Value: builtin.Samples.TotalQueryableSamplesPerStep[i].V,
TimestampMs: builtin.Samples.TotalQueryableSamplesPerStep[i].T,
}
}

statSamples := tripperware.PrometheusResponseSamplesStats{
TotalQueryableSamples: builtin.Samples.TotalQueryableSamples,
TotalQueryableSamplesPerStep: queryableSamplesStatsPerStep,
PeakSamples: int64(builtin.Samples.PeakSamples),
}

return &statSamples
}
9 changes: 9 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Config struct {
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`
EnablePerStepStats bool `yaml:"per_step_stats_enabled"`

// Use compression for metrics query API or instant and range query APIs.
ResponseCompression string `yaml:"response_compression"`

// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
QueryStoreAfter time.Duration `yaml:"query_store_after"`
MaxQueryIntoFuture time.Duration `yaml:"max_query_into_future"`
Expand Down Expand Up @@ -90,6 +93,7 @@ var (
errBadLookbackConfigs = errors.New("bad settings, query_store_after >= query_ingesters_within which can result in queries not being sent")
errShuffleShardingLookbackLessThanQueryStoreAfter = errors.New("the shuffle-sharding lookback period should be greater or equal than the configured 'query store after'")
errEmptyTimeRange = errors.New("empty time range")
errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip' and '' (disable compression)")
)

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand All @@ -111,6 +115,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.")
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.BoolVar(&cfg.EnablePerStepStats, "querier.per-step-stats-enabled", false, "Enable returning samples stats per steps in query response.")
f.StringVar(&cfg.ResponseCompression, "querier.response-compression", "", "Use compression for metrics query API or instant and range query APIs. Supports 'gzip' and '' (disable compression)")
f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.")
Expand All @@ -133,6 +138,10 @@ func (cfg *Config) Validate() error {
}
}

if cfg.ResponseCompression != "" && cfg.ResponseCompression != "gzip" {
return errUnsupportedResponseCompression
}

if cfg.ShuffleShardingIngestersLookbackPeriod > 0 {
if cfg.ShuffleShardingIngestersLookbackPeriod < cfg.QueryStoreAfter {
return errShuffleShardingLookbackLessThanQueryStoreAfter
Expand Down
Loading
Loading