diff --git a/CHANGELOG.md b/CHANGELOG.md index 5949b4b20c..44fe71076f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ * [ENHANCEMENT] Querier: Added `querier.store-gateway-query-stats-enabled` to enable or disable store gateway query stats log. #5749 * [ENHANCEMENT] Upgrade to go 1.21.6. #5765 * [ENHANCEMENT] AlertManager: Retrying AlertManager Delete Silence on error #5794 +* [ENHANCEMENT] Ingester: Add new ingester metric `cortex_ingester_max_inflight_query_requests`. #5798 * [BUGFIX] Distributor: Do not use label with empty values for sharding #5717 * [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719 * [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f2f7cdf5dc..331452d6ef 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -83,6 +83,9 @@ const ( // Period at which to attempt purging metadata from memory. metadataPurgePeriod = 5 * time.Minute + + // Period at which we should reset the max inflight query requests counter. + maxInflightRequestResetPeriod = 1 * time.Minute ) var ( @@ -200,6 +203,9 @@ type Ingester struct { // Rate of pushed samples. Only used by V2-ingester to limit global samples push rate. ingestionRate *util_math.EwmaRate inflightPushRequests atomic.Int64 + + inflightQueryRequests atomic.Int64 + maxInflightQueryRequests util_math.MaxTracker } // Shipper interface is used to have an easy way to mock it in tests. @@ -627,7 +633,13 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe logger: logger, ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), } - i.metrics = newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests) + i.metrics = newIngesterMetrics(registerer, + false, + cfg.ActiveSeriesMetricsEnabled, + i.getInstanceLimits, + i.ingestionRate, + &i.inflightPushRequests, + &i.maxInflightQueryRequests) // Replace specific metrics which we can't directly track but we need to read // them from the underlying system (ie. TSDB). @@ -692,7 +704,14 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe TSDBState: newTSDBState(bucketClient, registerer), logger: logger, } - i.metrics = newIngesterMetrics(registerer, false, false, i.getInstanceLimits, nil, &i.inflightPushRequests) + i.metrics = newIngesterMetrics(registerer, + false, + false, + i.getInstanceLimits, + nil, + &i.inflightPushRequests, + &i.maxInflightQueryRequests, + ) i.TSDBState.shipperIngesterID = "flusher" @@ -815,6 +834,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error { metadataPurgeTicker := time.NewTicker(metadataPurgePeriod) defer metadataPurgeTicker.Stop() + maxInflightRequestResetTicker := time.NewTicker(maxInflightRequestResetPeriod) + defer maxInflightRequestResetTicker.Stop() + for { select { case <-metadataPurgeTicker.C: @@ -831,6 +853,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error { case <-activeSeriesTickerChan: i.updateActiveSeries() + case <-maxInflightRequestResetTicker.C: + i.maxInflightQueryRequests.Tick() case <-userTSDBConfigTicker.C: i.updateUserTSDBConfigs() case <-ctx.Done(): @@ -1250,6 +1274,9 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client return nil, err } + c := i.trackInflightQueryRequest() + defer c() + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1322,6 +1349,9 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery return nil, err } + c := i.trackInflightQueryRequest() + defer c() + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1370,6 +1400,8 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery // LabelValues returns all label values that are associated with a given label name. func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) { + c := i.trackInflightQueryRequest() + defer c() resp, cleanup, err := i.labelsValuesCommon(ctx, req) defer cleanup() return resp, err @@ -1377,6 +1409,8 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque // LabelValuesStream returns all label values that are associated with a given label name. func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) error { + c := i.trackInflightQueryRequest() + defer c() resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req) defer cleanup() @@ -1451,6 +1485,8 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu // LabelNames return all the label names. func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) { + c := i.trackInflightQueryRequest() + defer c() resp, cleanup, err := i.labelNamesCommon(ctx, req) defer cleanup() return resp, err @@ -1458,6 +1494,8 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest // LabelNamesStream return all the label names. func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) error { + c := i.trackInflightQueryRequest() + defer c() resp, cleanup, err := i.labelNamesCommon(stream.Context(), req) defer cleanup() @@ -1741,6 +1779,9 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return err } + c := i.trackInflightQueryRequest() + defer c() + spanlog, ctx := spanlogger.New(stream.Context(), "QueryStream") defer spanlog.Finish() @@ -1786,6 +1827,13 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return nil } +func (i *Ingester) trackInflightQueryRequest() func() { + i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc()) + return func() { + i.inflightQueryRequests.Dec() + } +} + // queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes int, _ error) { q, err := db.ChunkQuerier(from, through) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index fe469e7d76..ac06433bd3 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -46,9 +46,17 @@ type ingesterMetrics struct { ingestionRate prometheus.GaugeFunc maxInflightPushRequests prometheus.GaugeFunc inflightRequests prometheus.GaugeFunc + inflightQueryRequests prometheus.GaugeFunc } -func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSDB bool, activeSeriesEnabled bool, instanceLimitsFn func() *InstanceLimits, ingestionRate *util_math.EwmaRate, inflightRequests *atomic.Int64) *ingesterMetrics { +func newIngesterMetrics(r prometheus.Registerer, + createMetricsConflictingWithTSDB bool, + activeSeriesEnabled bool, + instanceLimitsFn func() *InstanceLimits, + ingestionRate *util_math.EwmaRate, + inflightPushRequests *atomic.Int64, + maxInflightQueryRequests *util_math.MaxTracker, +) *ingesterMetrics { const ( instanceLimits = "cortex_ingester_instance_limits" instanceLimitsHelp = "Instance limits used by this ingester." // Must be same for all registrations. @@ -187,8 +195,18 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD Name: "cortex_ingester_inflight_push_requests", Help: "Current number of inflight push requests in ingester.", }, func() float64 { - if inflightRequests != nil { - return float64(inflightRequests.Load()) + if inflightPushRequests != nil { + return float64(inflightPushRequests.Load()) + } + return 0 + }), + + inflightQueryRequests: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_ingester_max_inflight_query_requests", + Help: "Max number of inflight query requests in ingester.", + }, func() float64 { + if maxInflightQueryRequests != nil { + return float64(maxInflightQueryRequests.Load()) } return 0 }), diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index cd688219f4..94f6746f39 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -8,8 +8,149 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + util_math "github.com/cortexproject/cortex/pkg/util/math" ) +func TestIngesterMetrics(t *testing.T) { + mainReg := prometheus.NewPedanticRegistry() + ingestionRate := util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval) + inflightPushRequests := &atomic.Int64{} + maxInflightQueryRequests := util_math.MaxTracker{} + maxInflightQueryRequests.Track(98) + inflightPushRequests.Store(14) + + m := newIngesterMetrics(mainReg, + false, + true, + func() *InstanceLimits { + return &InstanceLimits{ + MaxIngestionRate: 12, + MaxInMemoryTenants: 1, + MaxInMemorySeries: 11, + MaxInflightPushRequests: 6, + } + }, + ingestionRate, + inflightPushRequests, + &maxInflightQueryRequests) + + require.NotNil(t, m) + + err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` + # HELP cortex_ingester_inflight_push_requests Current number of inflight push requests in ingester. + # TYPE cortex_ingester_inflight_push_requests gauge + cortex_ingester_inflight_push_requests 14 + # HELP cortex_ingester_max_inflight_query_requests Max number of inflight query requests in ingester. + # TYPE cortex_ingester_max_inflight_query_requests gauge + cortex_ingester_max_inflight_query_requests 98 + # HELP cortex_ingester_ingested_exemplars_failures_total The total number of exemplars that errored on ingestion. + # TYPE cortex_ingester_ingested_exemplars_failures_total counter + cortex_ingester_ingested_exemplars_failures_total 0 + # HELP cortex_ingester_ingested_exemplars_total The total number of exemplars ingested. + # TYPE cortex_ingester_ingested_exemplars_total counter + cortex_ingester_ingested_exemplars_total 0 + # HELP cortex_ingester_ingested_metadata_failures_total The total number of metadata that errored on ingestion. + # TYPE cortex_ingester_ingested_metadata_failures_total counter + cortex_ingester_ingested_metadata_failures_total 0 + # HELP cortex_ingester_ingested_metadata_total The total number of metadata ingested. + # TYPE cortex_ingester_ingested_metadata_total counter + cortex_ingester_ingested_metadata_total 0 + # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion. + # TYPE cortex_ingester_ingested_samples_failures_total counter + cortex_ingester_ingested_samples_failures_total 0 + # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. + # TYPE cortex_ingester_ingested_samples_total counter + cortex_ingester_ingested_samples_total 0 + # HELP cortex_ingester_ingestion_rate_samples_per_second Current ingestion rate in samples/sec that ingester is using to limit access. + # TYPE cortex_ingester_ingestion_rate_samples_per_second gauge + cortex_ingester_ingestion_rate_samples_per_second 0 + # HELP cortex_ingester_instance_limits Instance limits used by this ingester. + # TYPE cortex_ingester_instance_limits gauge + cortex_ingester_instance_limits{limit="max_inflight_push_requests"} 6 + cortex_ingester_instance_limits{limit="max_ingestion_rate"} 12 + cortex_ingester_instance_limits{limit="max_series"} 11 + cortex_ingester_instance_limits{limit="max_tenants"} 1 + # HELP cortex_ingester_memory_metadata The current number of metadata in memory. + # TYPE cortex_ingester_memory_metadata gauge + cortex_ingester_memory_metadata 0 + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 0 + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 0 + # HELP cortex_ingester_queried_chunks The total number of chunks returned from queries. + # TYPE cortex_ingester_queried_chunks histogram + cortex_ingester_queried_chunks_bucket{le="10"} 0 + cortex_ingester_queried_chunks_bucket{le="80"} 0 + cortex_ingester_queried_chunks_bucket{le="640"} 0 + cortex_ingester_queried_chunks_bucket{le="5120"} 0 + cortex_ingester_queried_chunks_bucket{le="40960"} 0 + cortex_ingester_queried_chunks_bucket{le="327680"} 0 + cortex_ingester_queried_chunks_bucket{le="2.62144e+06"} 0 + cortex_ingester_queried_chunks_bucket{le="+Inf"} 0 + cortex_ingester_queried_chunks_sum 0 + cortex_ingester_queried_chunks_count 0 + # HELP cortex_ingester_queried_exemplars The total number of exemplars returned from queries. + # TYPE cortex_ingester_queried_exemplars histogram + cortex_ingester_queried_exemplars_bucket{le="10"} 0 + cortex_ingester_queried_exemplars_bucket{le="50"} 0 + cortex_ingester_queried_exemplars_bucket{le="250"} 0 + cortex_ingester_queried_exemplars_bucket{le="1250"} 0 + cortex_ingester_queried_exemplars_bucket{le="6250"} 0 + cortex_ingester_queried_exemplars_bucket{le="+Inf"} 0 + cortex_ingester_queried_exemplars_sum 0 + cortex_ingester_queried_exemplars_count 0 + # HELP cortex_ingester_queried_samples The total number of samples returned from queries. + # TYPE cortex_ingester_queried_samples histogram + cortex_ingester_queried_samples_bucket{le="10"} 0 + cortex_ingester_queried_samples_bucket{le="80"} 0 + cortex_ingester_queried_samples_bucket{le="640"} 0 + cortex_ingester_queried_samples_bucket{le="5120"} 0 + cortex_ingester_queried_samples_bucket{le="40960"} 0 + cortex_ingester_queried_samples_bucket{le="327680"} 0 + cortex_ingester_queried_samples_bucket{le="2.62144e+06"} 0 + cortex_ingester_queried_samples_bucket{le="2.097152e+07"} 0 + cortex_ingester_queried_samples_bucket{le="+Inf"} 0 + cortex_ingester_queried_samples_sum 0 + cortex_ingester_queried_samples_count 0 + # HELP cortex_ingester_queried_series The total number of series returned from queries. + # TYPE cortex_ingester_queried_series histogram + cortex_ingester_queried_series_bucket{le="10"} 0 + cortex_ingester_queried_series_bucket{le="80"} 0 + cortex_ingester_queried_series_bucket{le="640"} 0 + cortex_ingester_queried_series_bucket{le="5120"} 0 + cortex_ingester_queried_series_bucket{le="40960"} 0 + cortex_ingester_queried_series_bucket{le="327680"} 0 + cortex_ingester_queried_series_bucket{le="+Inf"} 0 + cortex_ingester_queried_series_sum 0 + cortex_ingester_queried_series_count 0 + # HELP cortex_ingester_queries_total The total number of queries the ingester has handled. + # TYPE cortex_ingester_queries_total counter + cortex_ingester_queries_total 0 + `)) + require.NoError(t, err) + + require.Equal(t, int64(98), maxInflightQueryRequests.Load()) + maxInflightQueryRequests.Tick() + + err = testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` + # HELP cortex_ingester_max_inflight_query_requests Max number of inflight query requests in ingester. + # TYPE cortex_ingester_max_inflight_query_requests gauge + cortex_ingester_max_inflight_query_requests 98 + `), "cortex_ingester_max_inflight_query_requests") + require.NoError(t, err) + maxInflightQueryRequests.Tick() + err = testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` + # HELP cortex_ingester_max_inflight_query_requests Max number of inflight query requests in ingester. + # TYPE cortex_ingester_max_inflight_query_requests gauge + cortex_ingester_max_inflight_query_requests 0 + `), "cortex_ingester_max_inflight_query_requests") + require.NoError(t, err) +} + func TestTSDBMetrics(t *testing.T) { mainReg := prometheus.NewPedanticRegistry() diff --git a/pkg/util/math/max_tracker.go b/pkg/util/math/max_tracker.go new file mode 100644 index 0000000000..5aebd34864 --- /dev/null +++ b/pkg/util/math/max_tracker.go @@ -0,0 +1,30 @@ +package math + +import "go.uber.org/atomic" + +type MaxTracker struct { + current atomic.Int64 + old atomic.Int64 +} + +func (m *MaxTracker) Track(max int64) { + if l := m.current.Load(); l < max { + m.current.CompareAndSwap(l, max) + } +} + +func (m *MaxTracker) Tick() { + m.old.Store(m.current.Load()) + m.current.Store(0) +} + +func (m *MaxTracker) Load() int64 { + c := m.current.Load() + o := m.old.Load() + + if c > o { + return c + } + + return o +} diff --git a/pkg/util/math/max_tracker_test.go b/pkg/util/math/max_tracker_test.go new file mode 100644 index 0000000000..02f55e5e84 --- /dev/null +++ b/pkg/util/math/max_tracker_test.go @@ -0,0 +1,17 @@ +package math + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMaxTracker(t *testing.T) { + mt := MaxTracker{} + mt.Track(50) + require.Equal(t, int64(50), mt.Load()) + mt.Tick() + require.Equal(t, int64(50), mt.Load()) + mt.Tick() + require.Equal(t, int64(0), mt.Load()) +}