Skip to content

Dont rely on hashes for collecting chunks together #3192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [ENHANCEMENT] Ingester: added new metric `cortex_ingester_active_series` to track active series more accurately. Also added options to control whether active series tracking is enabled (`-ingester.active-series-enabled`, defaults to false), and how often this metric is updated (`-ingester.active-series-update-period`) and max idle time for series to be considered inactive (`-ingester.active-series-idle-timeout`). #3153
* [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195
* [BUGFIX] Handle hash-collisions in the query path. #3192

## 1.4.0-rc.0 in progress

Expand Down
101 changes: 101 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,3 +619,104 @@ func TestQuerierWithChunksStorage(t *testing.T) {
assertServiceMetricsPrefixes(t, Querier, querier)
assertServiceMetricsPrefixes(t, TableManager, tableManager)
}

func TestHashCollisionHandling(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
flags := mergeFlags(ChunksStorageFlags, map[string]string{})

// Start dependencies.
dynamo := e2edb.NewDynamoDB()

consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, dynamo))

tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
require.NoError(t, s.StartAndWaitReady(tableManager))

// Wait until the first table-manager sync has completed, so that we're
// sure the tables have been created.
require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds"))

// Start Cortex components for the write path.
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester))

// Wait until the distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push a series for each user to Cortex.
now := time.Now()

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-0")
require.NoError(t, err)

var series []prompb.TimeSeries
var expectedVector model.Vector
// Generate two series which collide on fingerprints and fast fingerprints.
tsMillis := e2e.TimeToMilliseconds(now)
metric1 := []prompb.Label{
{Name: "A", Value: "K6sjsNNczPl"},
{Name: labels.MetricName, Value: "fingerprint_collision"},
}
metric2 := []prompb.Label{
{Name: "A", Value: "cswpLMIZpwt"},
{Name: labels.MetricName, Value: "fingerprint_collision"},
}

series = append(series, prompb.TimeSeries{
Labels: metric1,
Samples: []prompb.Sample{
{Value: float64(0), Timestamp: tsMillis},
},
})
expectedVector = append(expectedVector, &model.Sample{
Metric: prompbLabelsToModelMetric(metric1),
Value: model.SampleValue(float64(0)),
Timestamp: model.Time(tsMillis),
})
series = append(series, prompb.TimeSeries{
Labels: metric2,
Samples: []prompb.Sample{
{Value: float64(1), Timestamp: tsMillis},
},
})
expectedVector = append(expectedVector, &model.Sample{
Metric: prompbLabelsToModelMetric(metric2),
Value: model.SampleValue(float64(1)),
Timestamp: model.Time(tsMillis),
})

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(querier))

// Wait until the querier has updated the ring.
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Query the series.
c, err = e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-0")
require.NoError(t, err)

result, err := c.Query("fingerprint_collision", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
require.Equal(t, expectedVector, result.(model.Vector))
}

func prompbLabelsToModelMetric(pbLabels []prompb.Label) model.Metric {
metric := model.Metric{}

for _, l := range pbLabels {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}

return metric
}
16 changes: 8 additions & 8 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,32 +170,32 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
return nil, err
}

hashToChunkseries := map[model.Fingerprint]ingester_client.TimeSeriesChunk{}
hashToTimeSeries := map[model.Fingerprint]ingester_client.TimeSeries{}
hashToChunkseries := map[string]ingester_client.TimeSeriesChunk{}
hashToTimeSeries := map[string]ingester_client.TimeSeries{}
Comment on lines +173 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are now misnamed: the map key is not a hash.


for _, result := range results {
response := result.(*ingester_client.QueryStreamResponse)

// Parse any chunk series
for _, series := range response.Chunkseries {
hash := client.FastFingerprint(series.Labels)
existing := hashToChunkseries[hash]
key := client.LabelsToKeyString(client.FromLabelAdaptersToLabels(series.Labels))
existing := hashToChunkseries[key]
existing.Labels = series.Labels
existing.Chunks = append(existing.Chunks, series.Chunks...)
hashToChunkseries[hash] = existing
hashToChunkseries[key] = existing
}

// Parse any time series
for _, series := range response.Timeseries {
hash := client.FastFingerprint(series.Labels)
existing := hashToTimeSeries[hash]
key := client.LabelsToKeyString(client.FromLabelAdaptersToLabels(series.Labels))
existing := hashToTimeSeries[key]
existing.Labels = series.Labels
if existing.Samples == nil {
existing.Samples = series.Samples
} else {
existing.Samples = mergeSamples(existing.Samples, series.Samples)
}
hashToTimeSeries[hash] = existing
hashToTimeSeries[key] = existing
}
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,16 @@ func Fingerprint(labels labels.Labels) model.Fingerprint {
return model.Fingerprint(sum)
}

// LabelsToKeyString is used to form a string to be used as
// the hashKey. Don't print, use l.String() for printing.
func LabelsToKeyString(l labels.Labels) string {
// We are allocating 1024, even though most series are less than 600b long.
// But this is not an issue as this function is being inlined when called in a loop
// and buffer allocated is a static buffer and not a dynamic buffer on the heap.
b := make([]byte, 0, 1024)
return string(l.Bytes(b))
}

// MarshalJSON implements json.Marshaler.
func (s Sample) MarshalJSON() ([]byte, error) {
t, err := json.Marshal(model.Time(s.TimestampMs))
Expand Down
51 changes: 51 additions & 0 deletions pkg/ingester/client/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"sort"
"strconv"
"testing"
"unsafe"

Expand Down Expand Up @@ -216,3 +217,53 @@ func verifyCollision(t *testing.T, collision bool, ls1 labels.Labels, ls2 labels
t.Errorf("expected different fingerprints for %v (%016x) and %v (%016x)", ls1.String(), Fingerprint(ls1), ls2.String(), Fingerprint(ls2))
}
}

// The main usecase for `LabelsToKeyString` is to generate hashKeys
// for maps. We are benchmarking that here.
func BenchmarkSeriesMap(b *testing.B) {
benchmarkSeriesMap(100000, b)
}

func benchmarkSeriesMap(numSeries int, b *testing.B) {
series := makeSeries(numSeries)
sm := make(map[string]int, numSeries)

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for i, s := range series {
sm[LabelsToKeyString(s)] = i
}

for _, s := range series {
_, ok := sm[LabelsToKeyString(s)]
if !ok {
b.Fatal("element missing")
}
}

if len(sm) != numSeries {
b.Fatal("the number of series expected:", numSeries, "got:", len(sm))
}
}
}

func makeSeries(n int) []labels.Labels {
series := make([]labels.Labels, 0, n)
for i := 0; i < n; i++ {
series = append(series, labels.FromMap(map[string]string{
"label0": "value0",
"label1": "value1",
"label2": "value2",
"label3": "value3",
"label4": "value4",
"label5": "value5",
"label6": "value6",
"label7": "value7",
"label8": "value8",
"label9": strconv.Itoa(i),
}))
}

return series
}
6 changes: 3 additions & 3 deletions pkg/querier/chunk_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...

// Series in the returned set are sorted alphabetically by labels.
func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet {
chunksBySeries := map[model.Fingerprint][]chunk.Chunk{}
chunksBySeries := map[string][]chunk.Chunk{}
for _, c := range chunks {
fp := client.Fingerprint(c.Metric)
chunksBySeries[fp] = append(chunksBySeries[fp], c)
key := client.LabelsToKeyString(c.Metric)
chunksBySeries[key] = append(chunksBySeries[key], c)
}

series := make([]storage.Series, 0, len(chunksBySeries))
Expand Down