Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
* [ENHANCEMENT] Ruler: only load rules that belong to the ruler. Improves rules synching performances when ruler sharding is enabled. #3269
* [ENHANCEMENT] Added `-<prefix>.redis.tls-insecure-skip-verify` flag. #3298
* [ENHANCEMENT] Added `cortex_alertmanager_config_last_reload_successful_seconds` metric to show timestamp of last successful AM config reload. #3289
* [ENHANCEMENT] Blocks storage: reduced number of bucket listing operations to list block content (applies to newly created blocks only). #3363
* [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178
* [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195
* [BUGFIX] Handle hash-collisions in the query path. #3192
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ require (
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5
github.com/NYTimes/gziphandler v1.1.1
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/armon/go-metrics v0.3.3
github.com/aws/aws-sdk-go v1.35.5
Expand Down Expand Up @@ -53,10 +52,9 @@ require (
github.com/sony/gobreaker v0.4.1
github.com/spf13/afero v1.2.2
github.com/stretchr/testify v1.6.1
github.com/thanos-io/thanos v0.13.1-0.20200923175059-57035bf8f843
github.com/thanos-io/thanos v0.13.1-0.20201019130456-f41940581d9a
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50
go.etcd.io/etcd v0.5.0-alpha.5.0.20200520232829-54ba9589114f
go.uber.org/atomic v1.7.0
Expand Down
25 changes: 20 additions & 5 deletions go.sum

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"

"github.com/cortexproject/cortex/pkg/querier/series"
Expand Down Expand Up @@ -55,15 +56,15 @@ func (bqss *blockQuerierSeriesSet) Next() bool {
return false
}

currLabels := bqss.series[bqss.next].Labels
currLabels := labelpb.ZLabelsToPromLabels(bqss.series[bqss.next].Labels)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've a doubt. In this Thanos upgrade, bqss.series[bqss.next].Labels is a labelpb.ZLabel, which means the whole protobuf message []byte will be retained until the query execution completes. Don't we risk to end up utilising more memory than before? In queriers I'm more worried about memory utilisation than memory allocations rate.

Thoughts @pstibrany @bwplotka ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, problem you're suggesting should not be an issue in querier. Previously querier would receive the message and then extract individual strings (making a copy), and (possibly) discard the message. Now we skip the extraction part at the cost of keeping the message in memory longer. As long as we don't retain these labels somewhere, I think this should decrease memory usage. But it's risky, and we better keep an eye on it.

Copy link
Contributor

@pstibrany pstibrany Oct 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I think *storepb.Series already keep reference to entire original message via c.Raw.Data)

Turns out they make a copy. Perhaps this could be improved as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should better investigate / understand how this behave. If a query execution is slow, we may potentially increase the memory utilisation with this PR's change.

I'm going to merge this PR to unblock other work based on this, but let's keep an eye on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned offline. Thanks to ZLabels, it's really up to you. If you don't want to hold memory, just use labelpb.DeepCopy function to do copy manually (: But you are right if you hold labels for longer time, it's very likely you want to copy those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are still experimenting, so you are right. Might be risky

currChunks := bqss.series[bqss.next].Chunks

bqss.next++

// Merge chunks for current series. Chunks may come in multiple responses, but as soon
// as the response has chunks for a new series, we can stop searching. Series are sorted.
// See documentation for StoreClient.Series call for details.
for bqss.next < len(bqss.series) && storepb.CompareLabels(currLabels, bqss.series[bqss.next].Labels) == 0 {
for bqss.next < len(bqss.series) && labels.Compare(currLabels, labelpb.ZLabelsToPromLabels(bqss.series[bqss.next].Labels)) == 0 {
currChunks = append(currChunks, bqss.series[bqss.next].Chunks...)
bqss.next++
}
Expand All @@ -85,12 +86,12 @@ func (bqss *blockQuerierSeriesSet) Warnings() storage.Warnings {
}

// newBlockQuerierSeries makes a new blockQuerierSeries. Input labels must be already sorted by name.
func newBlockQuerierSeries(lbls []storepb.Label, chunks []storepb.AggrChunk) *blockQuerierSeries {
func newBlockQuerierSeries(lbls []labels.Label, chunks []storepb.AggrChunk) *blockQuerierSeries {
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].MinTime < chunks[j].MinTime
})

return &blockQuerierSeries{labels: storepb.LabelsToPromLabelsUnsafe(lbls), chunks: chunks}
return &blockQuerierSeries{labels: lbls, chunks: chunks}
}

type blockQuerierSeries struct {
Expand Down
37 changes: 21 additions & 16 deletions pkg/querier/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"

"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -39,7 +40,7 @@ func TestBlockQuerierSeries(t *testing.T) {
},
"should return series on success": {
series: &storepb.Series{
Labels: []storepb.Label{
Labels: []labelpb.ZLabel{
{Name: "foo", Value: "bar"},
},
Chunks: []storepb.AggrChunk{
Expand All @@ -56,7 +57,7 @@ func TestBlockQuerierSeries(t *testing.T) {
},
"should return error on failure while reading encoded chunk data": {
series: &storepb.Series{
Labels: []storepb.Label{{Name: "foo", Value: "bar"}},
Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}},
Chunks: []storepb.AggrChunk{
{MinTime: minTimestamp.Unix() * 1000, MaxTime: maxTimestamp.Unix() * 1000, Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: []byte{0, 1}}},
},
Expand All @@ -70,7 +71,7 @@ func TestBlockQuerierSeries(t *testing.T) {
testData := testData

t.Run(testName, func(t *testing.T) {
series := newBlockQuerierSeries(testData.series.Labels, testData.series.Chunks)
series := newBlockQuerierSeries(labelpb.ZLabelsToPromLabels(testData.series.Labels), testData.series.Chunks)

assert.Equal(t, testData.expectedMetric, series.Labels())

Expand Down Expand Up @@ -119,23 +120,23 @@ func TestBlockQuerierSeriesSet(t *testing.T) {
series: []*storepb.Series{
// first, with one chunk.
{
Labels: mkLabels("__name__", "first", "a", "a"),
Labels: mkZLabels("__name__", "first", "a", "a"),
Chunks: []storepb.AggrChunk{
createAggrChunkWithSineSamples(now, now.Add(100*time.Second), 3*time.Millisecond), // ceil(100 / 0.003) samples (= 33334)
},
},

// continuation of previous series. Must have exact same labels.
{
Labels: mkLabels("__name__", "first", "a", "a"),
Labels: mkZLabels("__name__", "first", "a", "a"),
Chunks: []storepb.AggrChunk{
createAggrChunkWithSineSamples(now.Add(100*time.Second), now.Add(200*time.Second), 3*time.Millisecond), // ceil(100 / 0.003) samples more, 66668 in total
},
},

// second, with multiple chunks
{
Labels: mkLabels("__name__", "second"),
Labels: mkZLabels("__name__", "second"),
Chunks: []storepb.AggrChunk{
// unordered chunks
createAggrChunkWithSineSamples(now.Add(400*time.Second), now.Add(600*time.Second), 5*time.Millisecond), // 200 / 0.005 (= 40000 samples, = 120000 in total)
Expand All @@ -146,13 +147,13 @@ func TestBlockQuerierSeriesSet(t *testing.T) {

// overlapping
{
Labels: mkLabels("__name__", "overlapping"),
Labels: mkZLabels("__name__", "overlapping"),
Chunks: []storepb.AggrChunk{
createAggrChunkWithSineSamples(now, now.Add(10*time.Second), 5*time.Millisecond), // 10 / 0.005 = 2000 samples
},
},
{
Labels: mkLabels("__name__", "overlapping"),
Labels: mkZLabels("__name__", "overlapping"),
Chunks: []storepb.AggrChunk{
// 10 / 0.005 = 2000 samples, but first 1000 are overlapping with previous series, so this chunk only contributes 1000
createAggrChunkWithSineSamples(now.Add(5*time.Second), now.Add(15*time.Second), 5*time.Millisecond),
Expand All @@ -161,29 +162,29 @@ func TestBlockQuerierSeriesSet(t *testing.T) {

// overlapping 2. Chunks here come in wrong order.
{
Labels: mkLabels("__name__", "overlapping2"),
Labels: mkZLabels("__name__", "overlapping2"),
Chunks: []storepb.AggrChunk{
// entire range overlaps with the next chunk, so this chunks contributes 0 samples (it will be sorted as second)
createAggrChunkWithSineSamples(now.Add(3*time.Second), now.Add(7*time.Second), 5*time.Millisecond),
},
},
{
Labels: mkLabels("__name__", "overlapping2"),
Labels: mkZLabels("__name__", "overlapping2"),
Chunks: []storepb.AggrChunk{
// this chunk has completely overlaps previous chunk. Since its minTime is lower, it will be sorted as first.
createAggrChunkWithSineSamples(now, now.Add(10*time.Second), 5*time.Millisecond), // 10 / 0.005 = 2000 samples
},
},
{
Labels: mkLabels("__name__", "overlapping2"),
Labels: mkZLabels("__name__", "overlapping2"),
Chunks: []storepb.AggrChunk{
// no samples
createAggrChunkWithSineSamples(now, now, 5*time.Millisecond),
},
},

{
Labels: mkLabels("__name__", "overlapping2"),
Labels: mkZLabels("__name__", "overlapping2"),
Chunks: []storepb.AggrChunk{
// 2000 samples more (10 / 0.005)
createAggrChunkWithSineSamples(now.Add(20*time.Second), now.Add(30*time.Second), 5*time.Millisecond),
Expand Down Expand Up @@ -262,11 +263,11 @@ func createAggrChunk(minTime, maxTime int64, samples ...promql.Point) storepb.Ag
}
}

func mkLabels(s ...string) []storepb.Label {
result := []storepb.Label{}
func mkZLabels(s ...string) []labelpb.ZLabel {
var result []labelpb.ZLabel

for i := 0; i+1 < len(s); i = i + 2 {
result = append(result, storepb.Label{
result = append(result, labelpb.ZLabel{
Name: s[i],
Value: s[i+1],
})
Expand All @@ -275,6 +276,10 @@ func mkLabels(s ...string) []storepb.Label {
return result
}

func mkLabels(s ...string) []labels.Label {
return labelpb.ZLabelsToPromLabels(mkZLabels(s...))
}

func Benchmark_newBlockQuerierSeries(b *testing.B) {
lbls := mkLabels(
"__name__", "test",
Expand Down Expand Up @@ -308,7 +313,7 @@ func Benchmark_blockQuerierSeriesSet_iteration(b *testing.B) {
// Generate series.
series := make([]*storepb.Series, 0, numSeries)
for seriesID := 0; seriesID < numSeries; seriesID++ {
lbls := mkLabels("__name__", "test", "series_id", strconv.Itoa(seriesID))
lbls := mkZLabels("__name__", "test", "series_id", strconv.Itoa(seriesID))
chunks := make([]storepb.AggrChunk, 0, numChunksPerSeries)

// Create chunks with 1 sample per second.
Expand Down
11 changes: 6 additions & 5 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
Expand Down Expand Up @@ -624,8 +625,8 @@ func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T)
func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) {
block1 := ulid.MustNew(1, nil)
block2 := ulid.MustNew(2, nil)
series1 := []storepb.Label{{Name: "__name__", Value: "metric_1"}}
series2 := []storepb.Label{{Name: "__name__", Value: "metric_2"}}
series1 := []labelpb.ZLabel{{Name: "__name__", Value: "metric_1"}}
series2 := []labelpb.ZLabel{{Name: "__name__", Value: "metric_2"}}

series1Samples := []promql.Point{
{T: 1589759955000, V: 1},
Expand Down Expand Up @@ -737,8 +738,8 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) {
require.NoError(t, err)
require.Len(t, matrix, 2)

assert.Equal(t, storepb.LabelsToPromLabels(series1), matrix[0].Metric)
assert.Equal(t, storepb.LabelsToPromLabels(series2), matrix[1].Metric)
assert.Equal(t, labelpb.ZLabelsToPromLabels(series1), matrix[0].Metric)
assert.Equal(t, labelpb.ZLabelsToPromLabels(series2), matrix[1].Metric)
assert.Equal(t, series1Samples, matrix[0].Points)
assert.Equal(t, series2Samples, matrix[1].Points)
}
Expand Down Expand Up @@ -840,7 +841,7 @@ func mockSeriesResponse(lbls labels.Labels, timeMillis int64, value float64) *st
return &storepb.SeriesResponse{
Result: &storepb.SeriesResponse_Series{
Series: &storepb.Series{
Labels: storepb.PromLabelsToLabels(lbls),
Labels: labelpb.ZLabelsFromPromLabels(lbls),
Chunks: []storepb.AggrChunk{
{MinTime: timeMillis, MaxTime: timeMillis, Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chunkData}},
},
Expand Down
5 changes: 3 additions & 2 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/weaveworks/common/logging"
"go.uber.org/atomic"
Expand Down Expand Up @@ -68,7 +69,7 @@ func TestBucketStores_InitialSync(t *testing.T) {
require.NoError(t, err)
assert.Empty(t, warnings)
require.Len(t, seriesSet, 1)
assert.Equal(t, []storepb.Label{{Name: labels.MetricName, Value: metricName}}, seriesSet[0].Labels)
assert.Equal(t, []labelpb.ZLabel{{Name: labels.MetricName, Value: metricName}}, seriesSet[0].Labels)
}

// Query series of another user.
Expand Down Expand Up @@ -146,7 +147,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) {
require.NoError(t, err)
assert.Empty(t, warnings)
assert.Len(t, seriesSet, 1)
assert.Equal(t, []storepb.Label{{Name: labels.MetricName, Value: metricName}}, seriesSet[0].Labels)
assert.Equal(t, []labelpb.ZLabel{{Name: labels.MetricName, Value: metricName}}, seriesSet[0].Labels)

assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_bucket_store_blocks_loaded Number of currently loaded blocks.
Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"

"github.com/cortexproject/cortex/pkg/ring"
Expand Down Expand Up @@ -695,7 +696,7 @@ func TestStoreGateway_SeriesQueryingShouldRemoveExternalLabels(t *testing.T) {
actual := srv.SeriesSet[seriesID]

// Ensure Cortex external labels have been removed.
assert.Equal(t, []storepb.Label{{Name: "series_id", Value: strconv.Itoa(seriesID)}}, actual.Labels)
assert.Equal(t, []labelpb.ZLabel{{Name: "series_id", Value: strconv.Itoa(seriesID)}}, actual.Labels)

// Ensure samples have been correctly queried. The Thanos store also deduplicate samples
// in most cases, but it's not strictly required guaranteeing deduplication at this stage.
Expand Down
6 changes: 4 additions & 2 deletions tools/blocksconvert/builder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/chunk"
Expand Down Expand Up @@ -204,8 +205,9 @@ func (d *tsdbBuilder) finishBlock(source string, labels map[string]string) (ulid
},

Thanos: metadata.Thanos{
Labels: labels,
Source: metadata.SourceType(source),
Labels: labels,
Source: metadata.SourceType(source),
SegmentFiles: block.GetSegmentFiles(d.tmpBlockDir),
},
}

Expand Down
18 changes: 18 additions & 0 deletions vendor/github.com/thanos-io/thanos/pkg/block/block.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/github.com/thanos-io/thanos/pkg/block/index.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions vendor/github.com/thanos-io/thanos/pkg/compact/compact.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading