diff --git a/CHANGELOG.md b/CHANGELOG.md index 6eb792133dd..e692b8c1aa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ * [BUGFIX] TSDB: Fixed TSDB creation conflict with blocks transfer in a `JOINING` ingester with the experimental TSDB blocks storage. #1818 * [BUGFIX] TSDB: `experimental.tsdb.ship-interval` of <=0 treated as disabled instead of allowing panic. #1975 * [BUGFIX] TSDB: Fixed `cortex_ingester_queried_samples` and `cortex_ingester_queried_series` metrics when using block storage. #1981 +* [BUGFIX] TSDB: Fixed `cortex_ingester_memory_series` and `cortex_ingester_memory_users` metrics when using with the experimental TSDB blocks storage. #1982 ## 0.4.0 / 2019-12-02 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 1ada90d16bf..550fd4e40f3 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -36,13 +36,17 @@ const ( ) type ingesterMetrics struct { - flushQueueLength prometheus.Gauge - ingestedSamples prometheus.Counter - ingestedSamplesFail prometheus.Counter - queries prometheus.Counter - queriedSamples prometheus.Histogram - queriedSeries prometheus.Histogram - queriedChunks prometheus.Histogram + flushQueueLength prometheus.Gauge + ingestedSamples prometheus.Counter + ingestedSamplesFail prometheus.Counter + queries prometheus.Counter + queriedSamples prometheus.Histogram + queriedSeries prometheus.Histogram + queriedChunks prometheus.Histogram + memSeries prometheus.Gauge + memUsers prometheus.Gauge + memSeriesCreatedTotal *prometheus.CounterVec + memSeriesRemovedTotal *prometheus.CounterVec } func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { @@ -81,6 +85,22 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { // A small number of chunks per series - 10*(8^(7-1)) = 2.6m. Buckets: prometheus.ExponentialBuckets(10, 8, 7), }), + memSeries: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_ingester_memory_series", + Help: "The current number of series in memory.", + }), + memUsers: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_ingester_memory_users", + Help: "The current number of users in memory.", + }), + memSeriesCreatedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_memory_series_created_total", + Help: "The total number of series that were created per user.", + }, []string{"user"}), + memSeriesRemovedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_memory_series_removed_total", + Help: "The total number of series that were removed per user.", + }, []string{"user"}), } if r != nil { @@ -92,6 +112,10 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { m.queriedSamples, m.queriedSeries, m.queriedChunks, + m.memSeries, + m.memUsers, + m.memSeriesCreatedTotal, + m.memSeriesRemovedTotal, ) } @@ -212,7 +236,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c // Init the limter and instantiate the user states which depend on it i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) - i.userStates = newUserStates(i.limiter, cfg) + i.userStates = newUserStates(i.limiter, cfg, i.metrics) // Now that user states have been created, we can start the lifecycler i.lifecycler.Start() diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 11bf7f1eb25..9e6ed5b7316 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -68,6 +68,16 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, }, } + // Replace specific metrics which we can't directly track but we need to read + // them from the underlying system (ie. TSDB). + if registerer != nil { + registerer.Unregister(i.metrics.memSeries) + registerer.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_ingester_memory_series", + Help: "The current number of series in memory.", + }, i.numSeriesInTSDB)) + } + i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey) if err != nil { return nil, err @@ -75,7 +85,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, // Init the limter and instantiate the user states which depend on it i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) - i.userStates = newUserStates(i.limiter, cfg) + i.userStates = newUserStates(i.limiter, cfg, i.metrics) // Scan and open TSDB's that already exist on disk if err := i.openExistingTSDB(context.Background()); err != nil { @@ -399,6 +409,8 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error) // Add the db to list of user databases i.TSDBState.dbs[userID] = db + i.metrics.memUsers.Inc() + return db, nil } @@ -533,7 +545,7 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { i.userStatesMtx.Lock() i.TSDBState.dbs[userID] = db i.userStatesMtx.Unlock() - + i.metrics.memUsers.Inc() }(userID) return filepath.SkipDir // Don't descend into directories @@ -548,3 +560,16 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { } return err } + +// numSeriesInTSDB returns the total number of in-memory series across all open TSDBs. +func (i *Ingester) numSeriesInTSDB() float64 { + i.userStatesMtx.RLock() + defer i.userStatesMtx.RUnlock() + + count := uint64(0) + for _, db := range i.TSDBState.dbs { + count += db.Head().NumSeries() + } + + return float64(count) +} diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 2df6f4d0edd..363c9516e4f 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -31,6 +31,12 @@ import ( func TestIngester_v2Push(t *testing.T) { metricLabelAdapters := []client.LabelAdapter{{Name: labels.MetricName, Value: "test"}} metricLabels := client.FromLabelAdaptersToLabels(metricLabelAdapters) + metricNames := []string{ + "cortex_ingester_ingested_samples_total", + "cortex_ingester_ingested_samples_failures_total", + "cortex_ingester_memory_series", + "cortex_ingester_memory_users", + } userID := "test" tests := map[string]struct { @@ -61,6 +67,12 @@ func TestIngester_v2Push(t *testing.T) { # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion. # TYPE cortex_ingester_ingested_samples_failures_total counter cortex_ingester_ingested_samples_failures_total 0 + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 1 + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 1 `, }, "should soft fail on sample out of order": { @@ -85,6 +97,12 @@ func TestIngester_v2Push(t *testing.T) { # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion. # TYPE cortex_ingester_ingested_samples_failures_total counter cortex_ingester_ingested_samples_failures_total 1 + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 1 + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 1 `, }, "should soft fail on sample out of bound": { @@ -109,6 +127,12 @@ func TestIngester_v2Push(t *testing.T) { # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion. # TYPE cortex_ingester_ingested_samples_failures_total counter cortex_ingester_ingested_samples_failures_total 1 + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 1 + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 1 `, }, "should soft fail on two different sample values at the same timestamp": { @@ -133,6 +157,12 @@ func TestIngester_v2Push(t *testing.T) { # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion. # TYPE cortex_ingester_ingested_samples_failures_total counter cortex_ingester_ingested_samples_failures_total 1 + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 1 + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 1 `, }, } @@ -182,13 +212,77 @@ func TestIngester_v2Push(t *testing.T) { assert.Equal(t, testData.expectedIngested, res.Timeseries) // Check tracked Prometheus metrics - metricNames := []string{"cortex_ingester_ingested_samples_total", "cortex_ingester_ingested_samples_failures_total"} err = testutil.GatherAndCompare(registry, strings.NewReader(testData.expectedMetrics), metricNames...) assert.NoError(t, err) }) } } +func TestIngester_v2Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *testing.T) { + metricLabelAdapters := []client.LabelAdapter{{Name: labels.MetricName, Value: "test"}} + metricLabels := client.FromLabelAdaptersToLabels(metricLabelAdapters) + metricNames := []string{ + "cortex_ingester_ingested_samples_total", + "cortex_ingester_ingested_samples_failures_total", + "cortex_ingester_memory_series", + "cortex_ingester_memory_users", + } + + registry := prometheus.NewRegistry() + + // Create a mocked ingester + cfg := defaultIngesterTestConfig() + cfg.LifecyclerConfig.JoinAfter = 0 + + i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, registry) + require.NoError(t, err) + defer i.Shutdown() + defer cleanup() + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + // Push timeseries for each user + for _, userID := range []string{"test-1", "test-2"} { + reqs := []*client.WriteRequest{ + client.ToWriteRequest( + []labels.Labels{metricLabels}, + []client.Sample{{Value: 1, TimestampMs: 9}}, + client.API), + client.ToWriteRequest( + []labels.Labels{metricLabels}, + []client.Sample{{Value: 2, TimestampMs: 10}}, + client.API), + } + + for _, req := range reqs { + ctx := user.InjectOrgID(context.Background(), userID) + _, err := i.v2Push(ctx, req) + require.NoError(t, err) + } + } + + // Check tracked Prometheus metrics + expectedMetrics := ` + # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. + # TYPE cortex_ingester_ingested_samples_total counter + cortex_ingester_ingested_samples_total 4 + # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion. + # TYPE cortex_ingester_ingested_samples_failures_total counter + cortex_ingester_ingested_samples_failures_total 0 + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 2 + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 2 + ` + + assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)) +} + func Test_Ingester_v2LabelNames(t *testing.T) { series := []struct { lbls labels.Labels diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 8f0129f32da..05d3dad6ea5 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -68,7 +68,7 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e fromIngesterID := "" seriesReceived := 0 xfer := func() error { - userStates := newUserStates(i.limiter, i.cfg) + userStates := newUserStates(i.limiter, i.cfg, i.metrics) for { wireSeries, err := stream.Recv() diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 0999564ac5a..89732c5aed8 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -8,7 +8,6 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/segmentio/fasthash/fnv1a" @@ -23,31 +22,13 @@ import ( "github.com/weaveworks/common/user" ) -var ( - memSeries = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cortex_ingester_memory_series", - Help: "The current number of series in memory.", - }) - memUsers = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cortex_ingester_memory_users", - Help: "The current number of users in memory.", - }) - memSeriesCreatedTotal = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingester_memory_series_created_total", - Help: "The total number of series that were created per user.", - }, []string{"user"}) - memSeriesRemovedTotal = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingester_memory_series_removed_total", - Help: "The total number of series that were removed per user.", - }, []string{"user"}) -) - // userStates holds the userState object for all users (tenants), // each one containing all the in-memory series for a given user. type userStates struct { states sync.Map limiter *SeriesLimiter cfg Config + metrics *ingesterMetrics } type userState struct { @@ -62,6 +43,7 @@ type userState struct { seriesInMetric []metricCounterShard + memSeries prometheus.Gauge memSeriesCreatedTotal prometheus.Counter memSeriesRemovedTotal prometheus.Counter discardedSamples *prometheus.CounterVec @@ -80,10 +62,11 @@ type metricCounterShard struct { m map[string]int } -func newUserStates(limiter *SeriesLimiter, cfg Config) *userStates { +func newUserStates(limiter *SeriesLimiter, cfg Config, metrics *ingesterMetrics) *userStates { return &userStates{ limiter: limiter, cfg: cfg, + metrics: metrics, } } @@ -157,14 +140,15 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labe ingestedRuleSamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod), seriesInMetric: seriesInMetric, - memSeriesCreatedTotal: memSeriesCreatedTotal.WithLabelValues(userID), - memSeriesRemovedTotal: memSeriesRemovedTotal.WithLabelValues(userID), + memSeries: us.metrics.memSeries, + memSeriesCreatedTotal: us.metrics.memSeriesCreatedTotal.WithLabelValues(userID), + memSeriesRemovedTotal: us.metrics.memSeriesRemovedTotal.WithLabelValues(userID), discardedSamples: validation.DiscardedSamples.MustCurryWith(prometheus.Labels{"user": userID}), } state.mapper = newFPMapper(state.fpToSeries) stored, ok := us.states.LoadOrStore(userID, state) if !ok { - memUsers.Inc() + us.metrics.memUsers.Inc() } state = stored.(*userState) } @@ -212,7 +196,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri } u.memSeriesCreatedTotal.Inc() - memSeries.Inc() + u.memSeries.Inc() labels := u.index.Add(metric, fp) series = newMemorySeries(labels) @@ -256,7 +240,7 @@ func (u *userState) removeSeries(fp model.Fingerprint, metric labels.Labels) { } u.memSeriesRemovedTotal.Inc() - memSeries.Dec() + u.memSeries.Dec() } // forSeriesMatching passes all series matching the given matchers to the