Skip to content

Add /series support to TSDB storage #1830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,10 @@ func (i *Ingester) LabelNames(ctx old_ctx.Context, req *client.LabelNamesRequest

// MetricsForLabelMatchers returns all the metrics which match a set of matchers.
func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
if i.cfg.TSDBEnabled {
return i.v2MetricsForLabelMatchers(ctx, req)
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
state, ok, err := i.userStates.getViaContext(ctx)
Expand Down
107 changes: 78 additions & 29 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
lbls "github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -162,29 +161,12 @@ func (i *Ingester) v2Query(ctx old_ctx.Context, req *client.QueryRequest) (*clie
}
defer q.Close()

// two different versions of the labels package are being used, converting matchers must be done
var converted []lbls.Matcher
for _, m := range matchers {
switch m.Type {
case labels.MatchEqual:
converted = append(converted, lbls.NewEqualMatcher(m.Name, m.Value))
case labels.MatchNotEqual:
converted = append(converted, lbls.Not(lbls.NewEqualMatcher(m.Name, m.Value)))
case labels.MatchRegexp:
rm, err := lbls.NewRegexpMatcher(m.Name, "^(?:"+m.Value+")$")
if err != nil {
return nil, err
}
converted = append(converted, rm)
case labels.MatchNotRegexp:
rm, err := lbls.NewRegexpMatcher(m.Name, "^(?:"+m.Value+")$")
if err != nil {
return nil, err
}
converted = append(converted, lbls.Not(rm))
}
convertedMatchers, err := cortex_tsdb.FromLegacyLabelMatchersToMatchers(matchers)
if err != nil {
return nil, err
}
ss, err := q.Select(converted...)

ss, err := q.Select(convertedMatchers...)
if err != nil {
return nil, err
}
Expand All @@ -193,13 +175,8 @@ func (i *Ingester) v2Query(ctx old_ctx.Context, req *client.QueryRequest) (*clie
for ss.Next() {
series := ss.At()

// convert labels to LabelAdapter
var adapters []client.LabelAdapter
for _, l := range series.Labels() {
adapters = append(adapters, client.LabelAdapter(l))
}
ts := client.TimeSeries{
Labels: adapters,
Labels: cortex_tsdb.FromLabelsToLabelAdapters(series.Labels()),
}

it := series.Iterator()
Expand Down Expand Up @@ -272,6 +249,78 @@ func (i *Ingester) v2LabelNames(ctx old_ctx.Context, req *client.LabelNamesReque
}, nil
}

func (i *Ingester) v2MetricsForLabelMatchers(ctx old_ctx.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

db := i.getTSDB(userID)
if db == nil {
return &client.MetricsForLabelMatchersResponse{}, nil
}

// Parse the request
from, to, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req)
if err != nil {
return nil, err
}

// Create a new instance of the TSDB querier
q, err := db.Querier(int64(from), int64(to))
if err != nil {
return nil, err
}
defer q.Close()

// Run a query for each matchers set and collect all the results
added := map[string]struct{}{}
result := &client.MetricsForLabelMatchersResponse{
Metric: make([]*client.Metric, 0),
}

for _, matchers := range matchersSet {
convertedMatchers, err := cortex_tsdb.FromLegacyLabelMatchersToMatchers(matchers)
if err != nil {
return nil, err
}

seriesSet, err := q.Select(convertedMatchers...)
if err != nil {
return nil, err
}

for seriesSet.Next() {
if seriesSet.Err() != nil {
break
}

// Given the same series can be matched by multiple matchers and we want to
// return the unique set of matching series, we do check if the series has
// already been added to the result
ls := seriesSet.At().Labels()
key := ls.String()
if _, ok := added[key]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be consistent with upstream, can we use this: https://github.com/prometheus/prometheus/blob/0ae4899c47fbcdab1b0bebedf213b4424f50f760/web/api/v1/api.go#L546-L550

It's an exposed function and I see it fitting perfectly here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked it. The storage.NewMergeSeriesSet() you pointed out works with the storage pkg structs, while here we work with tsdb structs (incompatible types). I checked the merge functions in tsdb pkg and looks a bit overkilled for what we need here: they merge series samples, while here we just need a unique set of labels.

I would suggest to keep this code as is.

continue
}

result.Metric = append(result.Metric, &client.Metric{
Labels: cortex_tsdb.FromLabelsToLabelAdapters(ls),
})

added[key] = struct{}{}
}

// In case of any error while iterating the series, we break
// the execution and return it
if err := seriesSet.Err(); err != nil {
return nil, err
}
}

return result, nil
}

func (i *Ingester) getTSDB(userID string) *tsdb.DB {
i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
Expand Down
149 changes: 149 additions & 0 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,155 @@ func Test_Ingester_v2Query(t *testing.T) {
}
}

func Test_Ingester_v2MetricsForLabelMatchers(t *testing.T) {
fixtures := []struct {
lbls labels.Labels
value float64
timestamp int64
}{
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}}, 1, 100000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}}, 1, 110000},
{labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000},
// The two following series have the same FastFingerprint=e002a3a451262627
{labels.Labels{{Name: labels.MetricName, Value: "collision"}, {Name: "app", Value: "l"}, {Name: "uniq0", Value: "0"}, {Name: "uniq1", Value: "1"}}, 1, 300000},
{labels.Labels{{Name: labels.MetricName, Value: "collision"}, {Name: "app", Value: "m"}, {Name: "uniq0", Value: "1"}, {Name: "uniq1", Value: "1"}}, 1, 300000},
}

tests := map[string]struct {
from int64
to int64
matchers []*client.LabelMatchers
expected []*client.Metric
}{
"should return an empty response if no metric match": {
from: math.MinInt64,
to: math.MaxInt64,
matchers: []*client.LabelMatchers{{
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "unknown"},
},
}},
expected: []*client.Metric{},
},
"should filter metrics by single matcher": {
from: math.MinInt64,
to: math.MaxInt64,
matchers: []*client.LabelMatchers{{
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
},
}},
expected: []*client.Metric{
{Labels: client.FromLabelsToLabelAdapters(fixtures[0].lbls)},
{Labels: client.FromLabelsToLabelAdapters(fixtures[1].lbls)},
},
},
"should filter metrics by multiple matchers": {
from: math.MinInt64,
to: math.MaxInt64,
matchers: []*client.LabelMatchers{
{
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: "status", Value: "200"},
},
},
{
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_2"},
},
},
},
expected: []*client.Metric{
{Labels: client.FromLabelsToLabelAdapters(fixtures[0].lbls)},
{Labels: client.FromLabelsToLabelAdapters(fixtures[2].lbls)},
},
},
"should filter metrics by time range": {
from: 100000,
to: 100000,
matchers: []*client.LabelMatchers{{
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
},
}},
expected: []*client.Metric{
{Labels: client.FromLabelsToLabelAdapters(fixtures[0].lbls)},
},
},
"should not return duplicated metrics on overlapping matchers": {
from: math.MinInt64,
to: math.MaxInt64,
matchers: []*client.LabelMatchers{
{
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
},
},
{
Matchers: []*client.LabelMatcher{
{Type: client.REGEX_MATCH, Name: model.MetricNameLabel, Value: "test.*"},
},
},
},
expected: []*client.Metric{
{Labels: client.FromLabelsToLabelAdapters(fixtures[0].lbls)},
{Labels: client.FromLabelsToLabelAdapters(fixtures[1].lbls)},
{Labels: client.FromLabelsToLabelAdapters(fixtures[2].lbls)},
},
},
"should return all matching metrics even if their FastFingerprint collide": {
from: math.MinInt64,
to: math.MaxInt64,
matchers: []*client.LabelMatchers{{
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "collision"},
},
}},
expected: []*client.Metric{
{Labels: client.FromLabelsToLabelAdapters(fixtures[3].lbls)},
{Labels: client.FromLabelsToLabelAdapters(fixtures[4].lbls)},
},
},
}

// Create ingester
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
require.NoError(t, err)
defer i.Shutdown()
defer cleanup()

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

// Push fixtures
ctx := user.InjectOrgID(context.Background(), "test")

for _, series := range fixtures {
req, _ := mockWriteRequest(series.lbls, series.value, series.timestamp)
_, err := i.v2Push(ctx, req)
require.NoError(t, err)
}

// Run tests
for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
req := &client.MetricsForLabelMatchersRequest{
StartTimestampMs: testData.from,
EndTimestampMs: testData.to,
MatchersSet: testData.matchers,
}

res, err := i.v2MetricsForLabelMatchers(ctx, req)
require.NoError(t, err)
assert.ElementsMatch(t, testData.expected, res.Metric)
})
}
}

func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) (*client.WriteRequest, *client.QueryResponse) {
samples := []client.Sample{
{
Expand Down
60 changes: 8 additions & 52 deletions pkg/ingester/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,32 +109,9 @@ func TestIngesterTransfer(t *testing.T) {
})

// Now write a sample to this ingester
const ts = 123000
const val = 456
var (
l = labels.Labels{{Name: labels.MetricName, Value: "foo"}}
sampleData = []client.Sample{
{
TimestampMs: ts,
Value: val,
},
}
expectedResponse = &client.QueryResponse{
Timeseries: []client.TimeSeries{
{
Labels: client.FromLabelsToLabelAdapters(l),
Samples: []client.Sample{
{
Value: val,
TimestampMs: ts,
},
},
},
},
}
)
req, expectedResponse := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
ctx := user.InjectOrgID(context.Background(), userID)
_, err = ing1.Push(ctx, client.ToWriteRequest([]labels.Labels{l}, sampleData, client.API))
_, err = ing1.Push(ctx, req)
require.NoError(t, err)

// Start a second ingester, but let it go into PENDING
Expand Down Expand Up @@ -171,7 +148,8 @@ func TestIngesterTransfer(t *testing.T) {
assert.Equal(t, expectedResponse, response)

// Check we can send the same sample again to the new ingester and get the same result
_, err = ing2.Push(ctx, client.ToWriteRequest([]labels.Labels{l}, sampleData, client.API))
req, _ = mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
_, err = ing2.Push(ctx, req)
require.NoError(t, err)
response, err = ing2.Query(ctx, request)
require.NoError(t, err)
Expand Down Expand Up @@ -448,32 +426,9 @@ func TestV2IngesterTransfer(t *testing.T) {
})

// Now write a sample to this ingester
const ts = 123000
const val = 456
var (
l = labels.Labels{{Name: labels.MetricName, Value: "foo"}}
sampleData = []client.Sample{
{
TimestampMs: ts,
Value: val,
},
}
expectedResponse = &client.QueryResponse{
Timeseries: []client.TimeSeries{
{
Labels: client.FromLabelsToLabelAdapters(l),
Samples: []client.Sample{
{
Value: val,
TimestampMs: ts,
},
},
},
},
}
)
req, expectedResponse := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
ctx := user.InjectOrgID(context.Background(), userID)
_, err = ing1.Push(ctx, client.ToWriteRequest([]labels.Labels{l}, sampleData, client.API))
_, err = ing1.Push(ctx, req)
require.NoError(t, err)

// Start a second ingester, but let it go into PENDING
Expand Down Expand Up @@ -518,7 +473,8 @@ func TestV2IngesterTransfer(t *testing.T) {
assert.Equal(t, expectedResponse, response)

// Check we can send the same sample again to the new ingester and get the same result
_, err = ing2.Push(ctx, client.ToWriteRequest([]labels.Labels{l}, sampleData, client.API))
req, _ = mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
_, err = ing2.Push(ctx, req)
require.NoError(t, err)
response, err = ing2.Query(ctx, request)
require.NoError(t, err)
Expand Down
Loading