Skip to content

Fix ingester_memory_series/users metrics for TSDB storage #1982

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* [BUGFIX] TSDB: Fixed TSDB creation conflict with blocks transfer in a `JOINING` ingester with the experimental TSDB blocks storage. #1818
* [BUGFIX] TSDB: `experimental.tsdb.ship-interval` of <=0 treated as disabled instead of allowing panic. #1975
* [BUGFIX] TSDB: Fixed `cortex_ingester_queried_samples` and `cortex_ingester_queried_series` metrics when using block storage. #1981
* [BUGFIX] TSDB: Fixed `cortex_ingester_memory_series` and `cortex_ingester_memory_users` metrics when using with the experimental TSDB blocks storage. #1982

## 0.4.0 / 2019-12-02

Expand Down
40 changes: 32 additions & 8 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@ const (
)

type ingesterMetrics struct {
flushQueueLength prometheus.Gauge
ingestedSamples prometheus.Counter
ingestedSamplesFail prometheus.Counter
queries prometheus.Counter
queriedSamples prometheus.Histogram
queriedSeries prometheus.Histogram
queriedChunks prometheus.Histogram
flushQueueLength prometheus.Gauge
ingestedSamples prometheus.Counter
ingestedSamplesFail prometheus.Counter
queries prometheus.Counter
queriedSamples prometheus.Histogram
queriedSeries prometheus.Histogram
queriedChunks prometheus.Histogram
memSeries prometheus.Gauge
memUsers prometheus.Gauge
memSeriesCreatedTotal *prometheus.CounterVec
memSeriesRemovedTotal *prometheus.CounterVec
}

func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
Expand Down Expand Up @@ -81,6 +85,22 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
// A small number of chunks per series - 10*(8^(7-1)) = 2.6m.
Buckets: prometheus.ExponentialBuckets(10, 8, 7),
}),
memSeries: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "cortex_ingester_memory_series",
Help: "The current number of series in memory.",
}),
memUsers: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "cortex_ingester_memory_users",
Help: "The current number of users in memory.",
}),
memSeriesCreatedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_memory_series_created_total",
Help: "The total number of series that were created per user.",
}, []string{"user"}),
memSeriesRemovedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_memory_series_removed_total",
Help: "The total number of series that were removed per user.",
}, []string{"user"}),
}

if r != nil {
Expand All @@ -92,6 +112,10 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
m.queriedSamples,
m.queriedSeries,
m.queriedChunks,
m.memSeries,
m.memUsers,
m.memSeriesCreatedTotal,
m.memSeriesRemovedTotal,
)
}

Expand Down Expand Up @@ -212,7 +236,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c

// Init the limter and instantiate the user states which depend on it
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
i.userStates = newUserStates(i.limiter, cfg)
i.userStates = newUserStates(i.limiter, cfg, i.metrics)

// Now that user states have been created, we can start the lifecycler
i.lifecycler.Start()
Expand Down
29 changes: 27 additions & 2 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,24 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
},
}

// Replace specific metrics which we can't directly track but we need to read
// them from the underlying system (ie. TSDB).
if registerer != nil {
registerer.Unregister(i.metrics.memSeries)
registerer.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_ingester_memory_series",
Help: "The current number of series in memory.",
}, i.numSeriesInTSDB))
}

i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey)
if err != nil {
return nil, err
}

// Init the limter and instantiate the user states which depend on it
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
i.userStates = newUserStates(i.limiter, cfg)
i.userStates = newUserStates(i.limiter, cfg, i.metrics)

// Scan and open TSDB's that already exist on disk
if err := i.openExistingTSDB(context.Background()); err != nil {
Expand Down Expand Up @@ -399,6 +409,8 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error)

// Add the db to list of user databases
i.TSDBState.dbs[userID] = db
i.metrics.memUsers.Inc()

return db, nil
}

Expand Down Expand Up @@ -533,7 +545,7 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
i.userStatesMtx.Lock()
i.TSDBState.dbs[userID] = db
i.userStatesMtx.Unlock()

i.metrics.memUsers.Inc()
}(userID)

return filepath.SkipDir // Don't descend into directories
Expand All @@ -548,3 +560,16 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
}
return err
}

// numSeriesInTSDB returns the total number of in-memory series across all open TSDBs.
func (i *Ingester) numSeriesInTSDB() float64 {
i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()

count := uint64(0)
for _, db := range i.TSDBState.dbs {
count += db.Head().NumSeries()
}

return float64(count)
}
96 changes: 95 additions & 1 deletion pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ import (
func TestIngester_v2Push(t *testing.T) {
metricLabelAdapters := []client.LabelAdapter{{Name: labels.MetricName, Value: "test"}}
metricLabels := client.FromLabelAdaptersToLabels(metricLabelAdapters)
metricNames := []string{
"cortex_ingester_ingested_samples_total",
"cortex_ingester_ingested_samples_failures_total",
"cortex_ingester_memory_series",
"cortex_ingester_memory_users",
}
userID := "test"

tests := map[string]struct {
Expand Down Expand Up @@ -61,6 +67,12 @@ func TestIngester_v2Push(t *testing.T) {
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 0
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 1
`,
},
"should soft fail on sample out of order": {
Expand All @@ -85,6 +97,12 @@ func TestIngester_v2Push(t *testing.T) {
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 1
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 1
`,
},
"should soft fail on sample out of bound": {
Expand All @@ -109,6 +127,12 @@ func TestIngester_v2Push(t *testing.T) {
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 1
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 1
`,
},
"should soft fail on two different sample values at the same timestamp": {
Expand All @@ -133,6 +157,12 @@ func TestIngester_v2Push(t *testing.T) {
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 1
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 1
`,
},
}
Expand Down Expand Up @@ -182,13 +212,77 @@ func TestIngester_v2Push(t *testing.T) {
assert.Equal(t, testData.expectedIngested, res.Timeseries)

// Check tracked Prometheus metrics
metricNames := []string{"cortex_ingester_ingested_samples_total", "cortex_ingester_ingested_samples_failures_total"}
err = testutil.GatherAndCompare(registry, strings.NewReader(testData.expectedMetrics), metricNames...)
assert.NoError(t, err)
})
}
}

func TestIngester_v2Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *testing.T) {
metricLabelAdapters := []client.LabelAdapter{{Name: labels.MetricName, Value: "test"}}
metricLabels := client.FromLabelAdaptersToLabels(metricLabelAdapters)
metricNames := []string{
"cortex_ingester_ingested_samples_total",
"cortex_ingester_ingested_samples_failures_total",
"cortex_ingester_memory_series",
"cortex_ingester_memory_users",
}

registry := prometheus.NewRegistry()

// Create a mocked ingester
cfg := defaultIngesterTestConfig()
cfg.LifecyclerConfig.JoinAfter = 0

i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, registry)
require.NoError(t, err)
defer i.Shutdown()
defer cleanup()

// Wait until the ingester is ACTIVE
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

// Push timeseries for each user
for _, userID := range []string{"test-1", "test-2"} {
reqs := []*client.WriteRequest{
client.ToWriteRequest(
[]labels.Labels{metricLabels},
[]client.Sample{{Value: 1, TimestampMs: 9}},
client.API),
client.ToWriteRequest(
[]labels.Labels{metricLabels},
[]client.Sample{{Value: 2, TimestampMs: 10}},
client.API),
}

for _, req := range reqs {
ctx := user.InjectOrgID(context.Background(), userID)
_, err := i.v2Push(ctx, req)
require.NoError(t, err)
}
}

// Check tracked Prometheus metrics
expectedMetrics := `
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
# TYPE cortex_ingester_ingested_samples_total counter
cortex_ingester_ingested_samples_total 4
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 0
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 2
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 2
`

assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
}

func Test_Ingester_v2LabelNames(t *testing.T) {
series := []struct {
lbls labels.Labels
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e
fromIngesterID := ""
seriesReceived := 0
xfer := func() error {
userStates := newUserStates(i.limiter, i.cfg)
userStates := newUserStates(i.limiter, i.cfg, i.metrics)

for {
wireSeries, err := stream.Recv()
Expand Down
36 changes: 10 additions & 26 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/segmentio/fasthash/fnv1a"
Expand All @@ -23,31 +22,13 @@ import (
"github.com/weaveworks/common/user"
)

var (
memSeries = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cortex_ingester_memory_series",
Help: "The current number of series in memory.",
})
memUsers = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cortex_ingester_memory_users",
Help: "The current number of users in memory.",
})
memSeriesCreatedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_memory_series_created_total",
Help: "The total number of series that were created per user.",
}, []string{"user"})
memSeriesRemovedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_memory_series_removed_total",
Help: "The total number of series that were removed per user.",
}, []string{"user"})
)

// userStates holds the userState object for all users (tenants),
// each one containing all the in-memory series for a given user.
type userStates struct {
states sync.Map
limiter *SeriesLimiter
cfg Config
metrics *ingesterMetrics
}

type userState struct {
Expand All @@ -62,6 +43,7 @@ type userState struct {

seriesInMetric []metricCounterShard

memSeries prometheus.Gauge
memSeriesCreatedTotal prometheus.Counter
memSeriesRemovedTotal prometheus.Counter
discardedSamples *prometheus.CounterVec
Expand All @@ -80,10 +62,11 @@ type metricCounterShard struct {
m map[string]int
}

func newUserStates(limiter *SeriesLimiter, cfg Config) *userStates {
func newUserStates(limiter *SeriesLimiter, cfg Config, metrics *ingesterMetrics) *userStates {
return &userStates{
limiter: limiter,
cfg: cfg,
metrics: metrics,
}
}

Expand Down Expand Up @@ -157,14 +140,15 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labe
ingestedRuleSamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod),
seriesInMetric: seriesInMetric,

memSeriesCreatedTotal: memSeriesCreatedTotal.WithLabelValues(userID),
memSeriesRemovedTotal: memSeriesRemovedTotal.WithLabelValues(userID),
memSeries: us.metrics.memSeries,
memSeriesCreatedTotal: us.metrics.memSeriesCreatedTotal.WithLabelValues(userID),
memSeriesRemovedTotal: us.metrics.memSeriesRemovedTotal.WithLabelValues(userID),
discardedSamples: validation.DiscardedSamples.MustCurryWith(prometheus.Labels{"user": userID}),
}
state.mapper = newFPMapper(state.fpToSeries)
stored, ok := us.states.LoadOrStore(userID, state)
if !ok {
memUsers.Inc()
us.metrics.memUsers.Inc()
}
state = stored.(*userState)
}
Expand Down Expand Up @@ -212,7 +196,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri
}

u.memSeriesCreatedTotal.Inc()
memSeries.Inc()
u.memSeries.Inc()

labels := u.index.Add(metric, fp)
series = newMemorySeries(labels)
Expand Down Expand Up @@ -256,7 +240,7 @@ func (u *userState) removeSeries(fp model.Fingerprint, metric labels.Labels) {
}

u.memSeriesRemovedTotal.Inc()
memSeries.Dec()
u.memSeries.Dec()
}

// forSeriesMatching passes all series matching the given matchers to the
Expand Down