diff --git a/CHANGELOG.md b/CHANGELOG.md index 82f0a9367e..af865528c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129 * [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245 * [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 +* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228 * [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232 * [ENHANCEMENT] Query Frontend: Add info field to query response. #6207 * [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188 diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 8eff8bd30e..039b9c052f 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -226,7 +226,7 @@ func NewQuerierHandler( // This is used for the stats API which we should not support. Or find other ways to. prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }), reg, - nil, + querier.StatsRenderer, false, nil, false, diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 64c7b55f4b..19d65c302f 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -89,13 +89,15 @@ type Handler struct { roundTripper http.RoundTripper // Metrics. - querySeconds *prometheus.CounterVec - querySeries *prometheus.CounterVec - querySamples *prometheus.CounterVec - queryChunkBytes *prometheus.CounterVec - queryDataBytes *prometheus.CounterVec - rejectedQueries *prometheus.CounterVec - activeUsers *util.ActiveUsersCleanupService + querySeconds *prometheus.CounterVec + querySeries *prometheus.CounterVec + queryFetchedSamples *prometheus.CounterVec + queryScannedSamples *prometheus.CounterVec + queryPeakSamples *prometheus.HistogramVec + queryChunkBytes *prometheus.CounterVec + queryDataBytes *prometheus.CounterVec + rejectedQueries *prometheus.CounterVec + activeUsers *util.ActiveUsersCleanupService } // NewHandler creates a new frontend handler. @@ -117,11 +119,25 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge Help: "Number of series fetched to execute a query.", }, []string{"user"}) - h.querySamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + h.queryFetchedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_samples_total", Help: "Number of samples fetched to execute a query.", }, []string{"user"}) + // It tracks TotalSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go#L237 for each user. + h.queryScannedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_samples_scanned_total", + Help: "Number of samples scanned to execute a query.", + }, []string{"user"}) + + h.queryPeakSamples = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_query_peak_samples", + Help: "Highest count of samples considered to execute a query.", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, + }, []string{"user"}) + h.queryChunkBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_fetched_chunks_bytes_total", Help: "Size of all chunks fetched to execute a query in bytes.", @@ -143,7 +159,9 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { h.querySeconds.DeleteLabelValues(user) h.querySeries.DeleteLabelValues(user) - h.querySamples.DeleteLabelValues(user) + h.queryFetchedSamples.DeleteLabelValues(user) + h.queryScannedSamples.DeleteLabelValues(user) + h.queryPeakSamples.DeleteLabelValues(user) h.queryChunkBytes.DeleteLabelValues(user) h.queryDataBytes.DeleteLabelValues(user) if err := util.DeleteMatchingLabels(h.rejectedQueries, map[string]string{"user": user}); err != nil { @@ -301,6 +319,8 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u numSeries := stats.LoadFetchedSeries() numChunks := stats.LoadFetchedChunks() numSamples := stats.LoadFetchedSamples() + numScannedSamples := stats.LoadScannedSamples() + numPeakSamples := stats.LoadPeakSamples() numChunkBytes := stats.LoadFetchedChunkBytes() numDataBytes := stats.LoadFetchedDataBytes() numStoreGatewayTouchedPostings := stats.LoadStoreGatewayTouchedPostings() @@ -312,7 +332,9 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u // Track stats. f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds()) f.querySeries.WithLabelValues(userID).Add(float64(numSeries)) - f.querySamples.WithLabelValues(userID).Add(float64(numSamples)) + f.queryFetchedSamples.WithLabelValues(userID).Add(float64(numSamples)) + f.queryScannedSamples.WithLabelValues(userID).Add(float64(numScannedSamples)) + f.queryPeakSamples.WithLabelValues(userID).Observe(float64(numPeakSamples)) f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes)) f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes)) f.activeUsers.UpdateUserTimestamp(userID, time.Now()) diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index b1933ffc86..d394fb2670 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -188,7 +188,7 @@ func TestHandler_ServeHTTP(t *testing.T) { { name: "test handler with stats enabled", cfg: HandlerConfig{QueryStatsEnabled: true}, - expectedMetrics: 4, + expectedMetrics: 6, roundTripperFunc: roundTripper, expectedStatusCode: http.StatusOK, }, @@ -202,7 +202,7 @@ func TestHandler_ServeHTTP(t *testing.T) { { name: "test handler with reasonResponseTooLarge", cfg: HandlerConfig{QueryStatsEnabled: true}, - expectedMetrics: 4, + expectedMetrics: 6, roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusRequestEntityTooLarge, @@ -218,7 +218,7 @@ func TestHandler_ServeHTTP(t *testing.T) { { name: "test handler with reasonTooManyRequests", cfg: HandlerConfig{QueryStatsEnabled: true}, - expectedMetrics: 4, + expectedMetrics: 6, roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusTooManyRequests, @@ -234,7 +234,7 @@ func TestHandler_ServeHTTP(t *testing.T) { { name: "test handler with reasonTooManySamples", cfg: HandlerConfig{QueryStatsEnabled: true}, - expectedMetrics: 4, + expectedMetrics: 6, roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusUnprocessableEntity, @@ -250,7 +250,7 @@ func TestHandler_ServeHTTP(t *testing.T) { { name: "test handler with reasonTooLongRange", cfg: HandlerConfig{QueryStatsEnabled: true}, - expectedMetrics: 4, + expectedMetrics: 6, roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusUnprocessableEntity, @@ -266,7 +266,7 @@ func TestHandler_ServeHTTP(t *testing.T) { { name: "test handler with reasonSeriesFetched", cfg: HandlerConfig{QueryStatsEnabled: true}, - expectedMetrics: 4, + expectedMetrics: 6, roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusUnprocessableEntity, @@ -282,7 +282,7 @@ func TestHandler_ServeHTTP(t *testing.T) { { name: "test handler with reasonChunksFetched", cfg: HandlerConfig{QueryStatsEnabled: true}, - expectedMetrics: 4, + expectedMetrics: 6, roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusUnprocessableEntity, @@ -298,7 +298,7 @@ func TestHandler_ServeHTTP(t *testing.T) { { name: "test handler with reasonChunkBytesFetched", cfg: HandlerConfig{QueryStatsEnabled: true}, - expectedMetrics: 4, + expectedMetrics: 6, roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusUnprocessableEntity, @@ -314,7 +314,7 @@ func TestHandler_ServeHTTP(t *testing.T) { { name: "test handler with reasonDataBytesFetched", cfg: HandlerConfig{QueryStatsEnabled: true}, - expectedMetrics: 4, + expectedMetrics: 6, roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusUnprocessableEntity, @@ -330,7 +330,7 @@ func TestHandler_ServeHTTP(t *testing.T) { { name: "test handler with reasonSeriesLimitStoreGateway", cfg: HandlerConfig{QueryStatsEnabled: true}, - expectedMetrics: 4, + expectedMetrics: 6, roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusUnprocessableEntity, @@ -346,7 +346,7 @@ func TestHandler_ServeHTTP(t *testing.T) { { name: "test handler with reasonChunksLimitStoreGateway", cfg: HandlerConfig{QueryStatsEnabled: true}, - expectedMetrics: 4, + expectedMetrics: 6, roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusUnprocessableEntity, @@ -362,7 +362,7 @@ func TestHandler_ServeHTTP(t *testing.T) { { name: "test handler with reasonBytesLimitStoreGateway", cfg: HandlerConfig{QueryStatsEnabled: true}, - expectedMetrics: 4, + expectedMetrics: 6, roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusUnprocessableEntity, @@ -395,6 +395,8 @@ func TestHandler_ServeHTTP(t *testing.T) { "cortex_query_fetched_series_total", "cortex_query_samples_total", "cortex_query_fetched_chunks_bytes_total", + "cortex_query_samples_scanned_total", + "cortex_query_peak_samples", ) assert.NoError(t, err) diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index f0d4a35f5d..1559731dad 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -302,6 +302,46 @@ func (s *QueryStats) LoadStoreGatewayTouchedPostingBytes() uint64 { return atomic.LoadUint64(&s.StoreGatewayTouchedPostingBytes) } +func (s *QueryStats) AddScannedSamples(count uint64) { + if s == nil { + return + } + + atomic.AddUint64(&s.ScannedSamples, count) +} + +func (s *QueryStats) LoadScannedSamples() uint64 { + if s == nil { + return 0 + } + + return atomic.LoadUint64(&s.ScannedSamples) +} + +func (s *QueryStats) AddPeakSamples(count uint64) { + if s == nil { + return + } + + atomic.AddUint64(&s.PeakSamples, count) +} + +func (s *QueryStats) SetPeakSamples(count uint64) { + if s == nil { + return + } + + atomic.StoreUint64(&s.PeakSamples, count) +} + +func (s *QueryStats) LoadPeakSamples() uint64 { + if s == nil { + return 0 + } + + return atomic.LoadUint64(&s.PeakSamples) +} + // Merge the provided Stats into this one. func (s *QueryStats) Merge(other *QueryStats) { if s == nil || other == nil { @@ -317,6 +357,8 @@ func (s *QueryStats) Merge(other *QueryStats) { s.AddFetchedChunks(other.LoadFetchedChunks()) s.AddStoreGatewayTouchedPostings(other.LoadStoreGatewayTouchedPostings()) s.AddStoreGatewayTouchedPostingBytes(other.LoadStoreGatewayTouchedPostingBytes()) + s.AddScannedSamples(other.LoadScannedSamples()) + s.SetPeakSamples(max(s.LoadPeakSamples(), other.LoadPeakSamples())) s.AddExtraFields(other.LoadExtraFields()...) } diff --git a/pkg/querier/stats/stats.pb.go b/pkg/querier/stats/stats.pb.go index eda53692cb..ea6f4ba418 100644 --- a/pkg/querier/stats/stats.pb.go +++ b/pkg/querier/stats/stats.pb.go @@ -59,6 +59,12 @@ type Stats struct { // The total size of postings touched in store gateway for a specific query, in bytes. // Only successful requests from querier to store gateway are included. StoreGatewayTouchedPostingBytes uint64 `protobuf:"varint,12,opt,name=store_gateway_touched_posting_bytes,json=storeGatewayTouchedPostingBytes,proto3" json:"store_gateway_touched_posting_bytes,omitempty"` + // The total number of samples scanned while evaluating a query. + // Equal to TotalSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go + ScannedSamples uint64 `protobuf:"varint,13,opt,name=scanned_samples,json=scannedSamples,proto3" json:"scanned_samples,omitempty"` + // The highest count of samples considered while evaluating a query. + // Equal to PeakSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go + PeakSamples uint64 `protobuf:"varint,14,opt,name=peak_samples,json=peakSamples,proto3" json:"peak_samples,omitempty"` } func (m *Stats) Reset() { *m = Stats{} } @@ -177,6 +183,20 @@ func (m *Stats) GetStoreGatewayTouchedPostingBytes() uint64 { return 0 } +func (m *Stats) GetScannedSamples() uint64 { + if m != nil { + return m.ScannedSamples + } + return 0 +} + +func (m *Stats) GetPeakSamples() uint64 { + if m != nil { + return m.PeakSamples + } + return 0 +} + func init() { proto.RegisterType((*Stats)(nil), "stats.Stats") proto.RegisterMapType((map[string]string)(nil), "stats.Stats.ExtraFieldsEntry") @@ -185,41 +205,43 @@ func init() { func init() { proto.RegisterFile("stats.proto", fileDescriptor_b4756a0aec8b9d44) } var fileDescriptor_b4756a0aec8b9d44 = []byte{ - // 544 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xcf, 0x6e, 0xd3, 0x40, - 0x10, 0xc6, 0xbd, 0xcd, 0x1f, 0xe2, 0x4d, 0x90, 0x82, 0x09, 0xc2, 0x8d, 0xc4, 0x26, 0x50, 0x0e, - 0x39, 0x20, 0x07, 0x85, 0x0b, 0x02, 0x09, 0x55, 0x69, 0x0b, 0x1c, 0x10, 0x82, 0xa4, 0x12, 0x52, - 0x2f, 0xab, 0x4d, 0xb2, 0x71, 0xac, 0x3a, 0xde, 0x60, 0xaf, 0x29, 0xbe, 0x21, 0xf1, 0x02, 0x1c, - 0x79, 0x04, 0x1e, 0x25, 0xc7, 0x1c, 0x7b, 0x2a, 0xc4, 0xb9, 0x70, 0xec, 0x23, 0xa0, 0x9d, 0xb5, - 0x5b, 0xa8, 0x04, 0xe2, 0xe6, 0x9d, 0xdf, 0x37, 0x9f, 0xf6, 0x9b, 0x59, 0xe3, 0x6a, 0x24, 0x99, - 0x8c, 0x9c, 0x45, 0x28, 0xa4, 0xb0, 0x4a, 0x70, 0x68, 0x36, 0x5c, 0xe1, 0x0a, 0xa8, 0x74, 0xd5, - 0x97, 0x86, 0x4d, 0xe2, 0x0a, 0xe1, 0xfa, 0xbc, 0x0b, 0xa7, 0x51, 0x3c, 0xed, 0x4e, 0xe2, 0x90, - 0x49, 0x4f, 0x04, 0x19, 0xdf, 0xbe, 0xca, 0x59, 0x90, 0x68, 0x74, 0xef, 0x73, 0x19, 0x97, 0x86, - 0xca, 0xda, 0xda, 0xc5, 0xe6, 0x09, 0xf3, 0x7d, 0x2a, 0xbd, 0x39, 0xb7, 0x51, 0x1b, 0x75, 0xaa, - 0xbd, 0x6d, 0x47, 0x37, 0x3a, 0x79, 0xa3, 0xb3, 0x9f, 0x19, 0xf7, 0x2b, 0xcb, 0xb3, 0x96, 0xf1, - 0xf5, 0x7b, 0x0b, 0x0d, 0x2a, 0xaa, 0xeb, 0xd0, 0x9b, 0x73, 0xeb, 0x21, 0x6e, 0x4c, 0xb9, 0x1c, - 0xcf, 0xf8, 0x84, 0x46, 0x3c, 0xf4, 0x78, 0x44, 0xc7, 0x22, 0x0e, 0xa4, 0xbd, 0xd5, 0x46, 0x9d, - 0xe2, 0xc0, 0xca, 0xd8, 0x10, 0xd0, 0x9e, 0x22, 0x96, 0x83, 0x6f, 0xe6, 0x1d, 0xe3, 0x59, 0x1c, - 0x1c, 0xd3, 0x51, 0x22, 0x79, 0x64, 0x17, 0xa0, 0xe1, 0x46, 0x86, 0xf6, 0x14, 0xe9, 0x2b, 0x60, - 0x3d, 0xc0, 0xb9, 0x0b, 0x9d, 0x30, 0xc9, 0x32, 0x79, 0x11, 0xe4, 0xf5, 0x8c, 0xec, 0x33, 0xc9, - 0xb4, 0x7a, 0x17, 0xd7, 0xf8, 0x47, 0x19, 0x32, 0x3a, 0xf5, 0xb8, 0x3f, 0x89, 0xec, 0x52, 0xbb, - 0xd0, 0xa9, 0xf6, 0xee, 0x38, 0x7a, 0xae, 0x90, 0xda, 0x39, 0x50, 0x82, 0xe7, 0xc0, 0x0f, 0x02, - 0x19, 0x26, 0x83, 0x2a, 0xbf, 0xac, 0xfc, 0x9e, 0x08, 0xee, 0x97, 0x27, 0x2a, 0xff, 0x91, 0x08, - 0x2e, 0x98, 0x25, 0xea, 0xe1, 0x5b, 0x17, 0x33, 0x60, 0xf3, 0x85, 0x7f, 0x31, 0x84, 0x6b, 0xd0, - 0x92, 0xc7, 0x1d, 0x6a, 0xa6, 0x7b, 0xee, 0x62, 0xd3, 0xf7, 0xe6, 0x9e, 0xa4, 0x33, 0x4f, 0xda, - 0x95, 0x36, 0xea, 0x98, 0xfd, 0xe2, 0xf2, 0x4c, 0x8d, 0x16, 0xca, 0x2f, 0x3d, 0x69, 0xed, 0xe0, - 0xeb, 0xd1, 0xc2, 0xf7, 0x24, 0x7d, 0x1f, 0xc3, 0xf8, 0x6c, 0x13, 0xec, 0x6a, 0x50, 0x7c, 0xab, - 0x6b, 0xd6, 0x11, 0xbe, 0xad, 0x70, 0x42, 0x23, 0x29, 0x42, 0xe6, 0x72, 0x7a, 0xb9, 0x4f, 0xfc, - 0xff, 0xfb, 0x6c, 0x80, 0xc7, 0x50, 0x5b, 0xbc, 0xcb, 0x77, 0xfb, 0x1a, 0xdf, 0x57, 0xae, 0x9c, - 0xba, 0x4c, 0xf2, 0x13, 0x96, 0x50, 0x29, 0x62, 0x48, 0xb9, 0x10, 0x91, 0xf4, 0x02, 0x37, 0x8f, - 0x59, 0x85, 0x7b, 0xb5, 0x41, 0xfb, 0x42, 0x4b, 0x0f, 0xb5, 0xf2, 0x4d, 0x26, 0xd4, 0x99, 0x5f, - 0xe1, 0x9d, 0x7f, 0xfa, 0x65, 0xab, 0xad, 0x81, 0x5d, 0xeb, 0xef, 0x76, 0xb0, 0xe9, 0xe6, 0x33, - 0x5c, 0xbf, 0xba, 0x48, 0xab, 0x8e, 0x0b, 0xc7, 0x3c, 0x81, 0x97, 0x6c, 0x0e, 0xd4, 0xa7, 0xd5, - 0xc0, 0xa5, 0x0f, 0xcc, 0x8f, 0x39, 0x3c, 0x48, 0x73, 0xa0, 0x0f, 0x4f, 0xb6, 0x1e, 0xa3, 0xfe, - 0xd3, 0xd5, 0x9a, 0x18, 0xa7, 0x6b, 0x62, 0x9c, 0xaf, 0x09, 0xfa, 0x94, 0x12, 0xf4, 0x2d, 0x25, - 0x68, 0x99, 0x12, 0xb4, 0x4a, 0x09, 0xfa, 0x91, 0x12, 0xf4, 0x33, 0x25, 0xc6, 0x79, 0x4a, 0xd0, - 0x97, 0x0d, 0x31, 0x56, 0x1b, 0x62, 0x9c, 0x6e, 0x88, 0x71, 0xa4, 0xff, 0xc9, 0x51, 0x19, 0xa6, - 0xf9, 0xe8, 0x57, 0x00, 0x00, 0x00, 0xff, 0xff, 0x16, 0xa4, 0x90, 0x1b, 0xb0, 0x03, 0x00, 0x00, + // 574 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xcf, 0x6e, 0xd3, 0x30, + 0x18, 0x8f, 0xb7, 0x75, 0x2c, 0x4e, 0x37, 0x46, 0x28, 0x22, 0x9b, 0x84, 0xd7, 0x31, 0x24, 0x7a, + 0x40, 0x19, 0x2a, 0x17, 0x04, 0x12, 0x9a, 0xba, 0x0d, 0x38, 0x20, 0x04, 0xed, 0x24, 0xa4, 0x5d, + 0x2c, 0xb7, 0x75, 0xd3, 0xa8, 0x69, 0x52, 0x12, 0x87, 0x91, 0x1b, 0x8f, 0xc0, 0x91, 0x47, 0xe0, + 0x51, 0x7a, 0xa3, 0xc7, 0x9d, 0x06, 0x4d, 0x2f, 0x1c, 0xf7, 0x08, 0xc8, 0x9f, 0x9d, 0x16, 0x26, + 0x81, 0xb8, 0xc5, 0xdf, 0xef, 0x8f, 0xfc, 0xfb, 0x7d, 0x31, 0xb6, 0x12, 0xc1, 0x44, 0xe2, 0x8e, + 0xe2, 0x48, 0x44, 0x76, 0x09, 0x0e, 0xdb, 0x15, 0x2f, 0xf2, 0x22, 0x98, 0xec, 0xcb, 0x2f, 0x05, + 0x6e, 0x13, 0x2f, 0x8a, 0xbc, 0x80, 0xef, 0xc3, 0xa9, 0x9d, 0xf6, 0xf6, 0xbb, 0x69, 0xcc, 0x84, + 0x1f, 0x85, 0x1a, 0xdf, 0xba, 0x8a, 0xb3, 0x30, 0x53, 0xd0, 0xdd, 0x6f, 0xab, 0xb8, 0xd4, 0x92, + 0xd6, 0xf6, 0x01, 0x36, 0xcf, 0x58, 0x10, 0x50, 0xe1, 0x0f, 0xb9, 0x83, 0xaa, 0xa8, 0x66, 0xd5, + 0xb7, 0x5c, 0x25, 0x74, 0x0b, 0xa1, 0x7b, 0xa4, 0x8d, 0x1b, 0x6b, 0xe3, 0x8b, 0x1d, 0xe3, 0xcb, + 0xf7, 0x1d, 0xd4, 0x5c, 0x93, 0xaa, 0x13, 0x7f, 0xc8, 0xed, 0x87, 0xb8, 0xd2, 0xe3, 0xa2, 0xd3, + 0xe7, 0x5d, 0x9a, 0xf0, 0xd8, 0xe7, 0x09, 0xed, 0x44, 0x69, 0x28, 0x9c, 0xa5, 0x2a, 0xaa, 0xad, + 0x34, 0x6d, 0x8d, 0xb5, 0x00, 0x3a, 0x94, 0x88, 0xed, 0xe2, 0x9b, 0x85, 0xa2, 0xd3, 0x4f, 0xc3, + 0x01, 0x6d, 0x67, 0x82, 0x27, 0xce, 0x32, 0x08, 0x6e, 0x68, 0xe8, 0x50, 0x22, 0x0d, 0x09, 0xd8, + 0x0f, 0x70, 0xe1, 0x42, 0xbb, 0x4c, 0x30, 0x4d, 0x5f, 0x01, 0xfa, 0xa6, 0x46, 0x8e, 0x98, 0x60, + 0x8a, 0x7d, 0x80, 0xcb, 0xfc, 0xa3, 0x88, 0x19, 0xed, 0xf9, 0x3c, 0xe8, 0x26, 0x4e, 0xa9, 0xba, + 0x5c, 0xb3, 0xea, 0x77, 0x5c, 0xd5, 0x2b, 0xa4, 0x76, 0x8f, 0x25, 0xe1, 0x39, 0xe0, 0xc7, 0xa1, + 0x88, 0xb3, 0xa6, 0xc5, 0x17, 0x93, 0xdf, 0x13, 0xc1, 0xfd, 0x8a, 0x44, 0xab, 0x7f, 0x24, 0x82, + 0x0b, 0xea, 0x44, 0x75, 0x7c, 0x6b, 0xde, 0x01, 0x1b, 0x8e, 0x82, 0x79, 0x09, 0xd7, 0x40, 0x52, + 0xc4, 0x6d, 0x29, 0x4c, 0x69, 0x76, 0xb1, 0x19, 0xf8, 0x43, 0x5f, 0xd0, 0xbe, 0x2f, 0x9c, 0xb5, + 0x2a, 0xaa, 0x99, 0x8d, 0x95, 0xf1, 0x85, 0xac, 0x16, 0xc6, 0x2f, 0x7d, 0x61, 0xef, 0xe1, 0xf5, + 0x64, 0x14, 0xf8, 0x82, 0xbe, 0x4f, 0xa1, 0x3e, 0xc7, 0x04, 0xbb, 0x32, 0x0c, 0xdf, 0xaa, 0x99, + 0x7d, 0x8a, 0x6f, 0x4b, 0x38, 0xa3, 0x89, 0x88, 0x62, 0xe6, 0x71, 0xba, 0xd8, 0x27, 0xfe, 0xff, + 0x7d, 0x56, 0xc0, 0xa3, 0xa5, 0x2c, 0xde, 0x15, 0xbb, 0x7d, 0x8d, 0xef, 0x49, 0x57, 0x4e, 0x3d, + 0x26, 0xf8, 0x19, 0xcb, 0xa8, 0x88, 0x52, 0x48, 0x39, 0x8a, 0x12, 0xe1, 0x87, 0x5e, 0x11, 0xd3, + 0x82, 0x7b, 0x55, 0x81, 0xfb, 0x42, 0x51, 0x4f, 0x14, 0xf3, 0x8d, 0x26, 0xaa, 0xcc, 0xaf, 0xf0, + 0xde, 0x3f, 0xfd, 0xf4, 0x6a, 0xcb, 0x60, 0xb7, 0xf3, 0x77, 0x3b, 0xb5, 0xe9, 0xfb, 0xf8, 0x7a, + 0xd2, 0x61, 0x61, 0xb8, 0x68, 0xdd, 0x59, 0x07, 0xe5, 0x86, 0x1e, 0xeb, 0xbe, 0xed, 0x5d, 0x5c, + 0x1e, 0x71, 0x36, 0x98, 0xb3, 0x36, 0x80, 0x65, 0xc9, 0x99, 0xa6, 0x6c, 0x3f, 0xc3, 0x9b, 0x57, + 0x7f, 0x0a, 0x7b, 0x13, 0x2f, 0x0f, 0x78, 0x06, 0xaf, 0xc2, 0x6c, 0xca, 0x4f, 0xbb, 0x82, 0x4b, + 0x1f, 0x58, 0x90, 0x72, 0xf8, 0xb9, 0xcd, 0xa6, 0x3a, 0x3c, 0x59, 0x7a, 0x8c, 0x1a, 0x4f, 0x27, + 0x53, 0x62, 0x9c, 0x4f, 0x89, 0x71, 0x39, 0x25, 0xe8, 0x53, 0x4e, 0xd0, 0xd7, 0x9c, 0xa0, 0x71, + 0x4e, 0xd0, 0x24, 0x27, 0xe8, 0x47, 0x4e, 0xd0, 0xcf, 0x9c, 0x18, 0x97, 0x39, 0x41, 0x9f, 0x67, + 0xc4, 0x98, 0xcc, 0x88, 0x71, 0x3e, 0x23, 0xc6, 0xa9, 0x7a, 0xdf, 0xed, 0x55, 0xd8, 0xcc, 0xa3, + 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xd3, 0x67, 0xda, 0xe5, 0xfc, 0x03, 0x00, 0x00, } func (this *Stats) Equal(that interface{}) bool { @@ -282,13 +304,19 @@ func (this *Stats) Equal(that interface{}) bool { if this.StoreGatewayTouchedPostingBytes != that1.StoreGatewayTouchedPostingBytes { return false } + if this.ScannedSamples != that1.ScannedSamples { + return false + } + if this.PeakSamples != that1.PeakSamples { + return false + } return true } func (this *Stats) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 16) + s := make([]string, 0, 18) s = append(s, "&stats.Stats{") s = append(s, "WallTime: "+fmt.Sprintf("%#v", this.WallTime)+",\n") s = append(s, "FetchedSeriesCount: "+fmt.Sprintf("%#v", this.FetchedSeriesCount)+",\n") @@ -314,6 +342,8 @@ func (this *Stats) GoString() string { s = append(s, "QueryStorageWallTime: "+fmt.Sprintf("%#v", this.QueryStorageWallTime)+",\n") s = append(s, "StoreGatewayTouchedPostingsCount: "+fmt.Sprintf("%#v", this.StoreGatewayTouchedPostingsCount)+",\n") s = append(s, "StoreGatewayTouchedPostingBytes: "+fmt.Sprintf("%#v", this.StoreGatewayTouchedPostingBytes)+",\n") + s = append(s, "ScannedSamples: "+fmt.Sprintf("%#v", this.ScannedSamples)+",\n") + s = append(s, "PeakSamples: "+fmt.Sprintf("%#v", this.PeakSamples)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -345,6 +375,16 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.PeakSamples != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.PeakSamples)) + i-- + dAtA[i] = 0x70 + } + if m.ScannedSamples != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.ScannedSamples)) + i-- + dAtA[i] = 0x68 + } if m.StoreGatewayTouchedPostingBytes != 0 { i = encodeVarintStats(dAtA, i, uint64(m.StoreGatewayTouchedPostingBytes)) i-- @@ -487,6 +527,12 @@ func (m *Stats) Size() (n int) { if m.StoreGatewayTouchedPostingBytes != 0 { n += 1 + sovStats(uint64(m.StoreGatewayTouchedPostingBytes)) } + if m.ScannedSamples != 0 { + n += 1 + sovStats(uint64(m.ScannedSamples)) + } + if m.PeakSamples != 0 { + n += 1 + sovStats(uint64(m.PeakSamples)) + } return n } @@ -523,6 +569,8 @@ func (this *Stats) String() string { `QueryStorageWallTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.QueryStorageWallTime), "Duration", "duration.Duration", 1), `&`, ``, 1) + `,`, `StoreGatewayTouchedPostingsCount:` + fmt.Sprintf("%v", this.StoreGatewayTouchedPostingsCount) + `,`, `StoreGatewayTouchedPostingBytes:` + fmt.Sprintf("%v", this.StoreGatewayTouchedPostingBytes) + `,`, + `ScannedSamples:` + fmt.Sprintf("%v", this.ScannedSamples) + `,`, + `PeakSamples:` + fmt.Sprintf("%v", this.PeakSamples) + `,`, `}`, }, "") return s @@ -941,6 +989,44 @@ func (m *Stats) Unmarshal(dAtA []byte) error { break } } + case 13: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ScannedSamples", wireType) + } + m.ScannedSamples = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ScannedSamples |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 14: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PeakSamples", wireType) + } + m.PeakSamples = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.PeakSamples |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) diff --git a/pkg/querier/stats/stats.proto b/pkg/querier/stats/stats.proto index 0bb3beedac..8e53e02ccc 100644 --- a/pkg/querier/stats/stats.proto +++ b/pkg/querier/stats/stats.proto @@ -39,4 +39,10 @@ message Stats { // The total size of postings touched in store gateway for a specific query, in bytes. // Only successful requests from querier to store gateway are included. uint64 store_gateway_touched_posting_bytes = 12; + // The total number of samples scanned while evaluating a query. + // Equal to TotalSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go + uint64 scanned_samples = 13; + // The highest count of samples considered while evaluating a query. + // Equal to PeakSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go + uint64 peak_samples = 14; } diff --git a/pkg/querier/stats/stats_test.go b/pkg/querier/stats/stats_test.go index 5d938a9419..e698d503d9 100644 --- a/pkg/querier/stats/stats_test.go +++ b/pkg/querier/stats/stats_test.go @@ -199,6 +199,8 @@ func TestStats_Merge(t *testing.T) { stats1.AddStoreGatewayTouchedPostingBytes(300) stats1.AddFetchedChunks(105) stats1.AddFetchedSamples(109) + stats1.AddScannedSamples(100) + stats1.AddPeakSamples(100) stats1.AddExtraFields("a", "b") stats1.AddExtraFields("a", "b") @@ -212,6 +214,8 @@ func TestStats_Merge(t *testing.T) { stats1.AddStoreGatewayTouchedPostingBytes(301) stats2.AddFetchedChunks(102) stats2.AddFetchedSamples(103) + stats2.AddPeakSamples(105) + stats2.AddScannedSamples(105) stats2.AddExtraFields("c", "d") stats1.Merge(stats2) @@ -223,6 +227,8 @@ func TestStats_Merge(t *testing.T) { assert.Equal(t, uint64(201), stats1.LoadFetchedDataBytes()) assert.Equal(t, uint64(207), stats1.LoadFetchedChunks()) assert.Equal(t, uint64(212), stats1.LoadFetchedSamples()) + assert.Equal(t, uint64(205), stats1.LoadScannedSamples()) + assert.Equal(t, uint64(105), stats1.LoadPeakSamples()) assert.Equal(t, uint64(401), stats1.LoadStoreGatewayTouchedPostings()) assert.Equal(t, uint64(601), stats1.LoadStoreGatewayTouchedPostingBytes()) checkExtraFields(t, []interface{}{"a", "b", "c", "d"}, stats1.LoadExtraFields()) diff --git a/pkg/querier/stats_renderer.go b/pkg/querier/stats_renderer.go new file mode 100644 index 0000000000..746c63afc5 --- /dev/null +++ b/pkg/querier/stats_renderer.go @@ -0,0 +1,20 @@ +package querier + +import ( + "context" + + prom_stats "github.com/prometheus/prometheus/util/stats" + v1 "github.com/prometheus/prometheus/web/api/v1" + + "github.com/cortexproject/cortex/pkg/querier/stats" +) + +func StatsRenderer(ctx context.Context, promStat *prom_stats.Statistics, param string) prom_stats.QueryStats { + queryStat := stats.FromContext(ctx) + if queryStat != nil && promStat != nil { + queryStat.AddScannedSamples(uint64(promStat.Samples.TotalSamples)) + queryStat.AddPeakSamples(uint64(promStat.Samples.PeakSamples)) + } + + return v1.DefaultStatsRenderer(ctx, promStat, param) +} diff --git a/pkg/querier/stats_renderer_test.go b/pkg/querier/stats_renderer_test.go new file mode 100644 index 0000000000..68b201af08 --- /dev/null +++ b/pkg/querier/stats_renderer_test.go @@ -0,0 +1,107 @@ +package querier + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/regexp" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + v1 "github.com/prometheus/prometheus/web/api/v1" + "github.com/stretchr/testify/assert" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/querier/stats" +) + +type mockSampleAndChunkQueryable struct { + queryableFn func(mint, maxt int64) (storage.Querier, error) + chunkQueryableFn func(mint, maxt int64) (storage.ChunkQuerier, error) +} + +func (m mockSampleAndChunkQueryable) Querier(mint, maxt int64) (storage.Querier, error) { + return m.queryableFn(mint, maxt) +} + +func (m mockSampleAndChunkQueryable) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { + return m.chunkQueryableFn(mint, maxt) +} + +func Test_StatsRenderer(t *testing.T) { + engine := promql.NewEngine(promql.EngineOpts{ + MaxSamples: 100, + Timeout: time.Second * 2, + }) + mockQueryable := &mockSampleAndChunkQueryable{ + queryableFn: func(_, _ int64) (storage.Querier, error) { + return mockQuerier{ + matrix: model.Matrix{ + { + Metric: model.Metric{"__name__": "test", "foo": "bar"}, + Values: []model.SamplePair{ + {Timestamp: 1536673665000, Value: 0}, + {Timestamp: 1536673670000, Value: 1}, + {Timestamp: 1536673675000, Value: 2}, + {Timestamp: 1536673680000, Value: 3}, + }, + }, + }, + }, nil + }, + } + + api := v1.NewAPI( + engine, + mockQueryable, + nil, + nil, + func(context.Context) v1.ScrapePoolsRetriever { return nil }, + func(context.Context) v1.TargetRetriever { return &DummyTargetRetriever{} }, + func(context.Context) v1.AlertmanagerRetriever { return &DummyAlertmanagerRetriever{} }, + func() config.Config { return config.Config{} }, + map[string]string{}, + v1.GlobalURLOptions{}, + func(f http.HandlerFunc) http.HandlerFunc { return f }, + nil, // Only needed for admin APIs. + "", // This is for snapshots, which is disabled when admin APIs are disabled. Hence empty. + false, // Disable admin APIs. + log.NewNopLogger(), + func(context.Context) v1.RulesRetriever { return &DummyRulesRetriever{} }, + 0, 0, 0, // Remote read samples and concurrency limit. + false, // Not an agent. + regexp.MustCompile(".*"), + nil, + &v1.PrometheusVersion{}, + prometheus.DefaultGatherer, + nil, + StatsRenderer, + false, + nil, + false, + ) + + promRouter := route.New().WithPrefix("/api/v1") + api.Register(promRouter) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5", nil) + ctx := context.Background() + _, ctx = stats.ContextWithEmptyStats(ctx) + req = req.WithContext(user.InjectOrgID(ctx, "user1")) + + rec := httptest.NewRecorder() + promRouter.ServeHTTP(rec, req) + + assert.Equal(t, http.StatusOK, rec.Code) + queryStats := stats.FromContext(ctx) + assert.NotNil(t, queryStats) + assert.Equal(t, uint64(4), queryStats.LoadPeakSamples()) + assert.Equal(t, uint64(4), queryStats.LoadScannedSamples()) +}