Skip to content

Avoid duplicate label index writes by combining LabelEntryCacheKeys and LabelWriteEntries #1435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 21 additions & 28 deletions pkg/chunk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ type Schema interface {
GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)

// Should only be used with the seriesStore. TODO: Make seriesStore implement a different interface altogether.
GetLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
// returns cache key string and []IndexEntry per bucket, matched in order
GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error)
GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
GetLabelEntryCacheKeys(from, through model.Time, userID string, labels labels.Labels) []string

// When doing a read, use these methods to return the list of entries you should query
GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error)
Expand Down Expand Up @@ -97,17 +97,31 @@ func (s schema) GetWriteEntries(from, through model.Time, userID string, metricN
return result, nil
}

func (s schema) GetLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
var result []IndexEntry
// returns cache key string and []IndexEntry per bucket, matched in order
func (s schema) GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) {
var keys []string
var indexEntries [][]IndexEntry

for _, bucket := range s.buckets(from, through, userID) {
key := strings.Join([]string{
bucket.tableName,
bucket.hashKey,
string(labelsSeriesID(labels)),
},
"-",
)
// This is just encoding to remove invalid characters so that we can put them in memcache.
// We're not hashing them as the length of the key is well within memcache bounds. tableName + userid + day + 32Byte(seriesID)
key = hex.EncodeToString([]byte(key))
keys = append(keys, key)

entries, err := s.entries.GetLabelWriteEntries(bucket, metricName, labels, chunkID)
if err != nil {
return nil, err
return nil, nil, err
}
result = append(result, entries...)
indexEntries = append(indexEntries, entries)
}
return result, nil
return keys, indexEntries, nil
}

func (s schema) GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
Expand All @@ -124,27 +138,6 @@ func (s schema) GetChunkWriteEntries(from, through model.Time, userID string, me

}

// Should only used for v9Schema
func (s schema) GetLabelEntryCacheKeys(from, through model.Time, userID string, labels labels.Labels) []string {
var result []string
for _, bucket := range s.buckets(from, through, userID) {
key := strings.Join([]string{
bucket.tableName,
bucket.hashKey,
string(labelsSeriesID(labels)),
},
"-",
)
// This is just encoding to remove invalid characters so that we can put them in memcache.
// We're not hashing them as the length of the key is well within memcache bounds. tableName + userid + day + 32Byte(seriesID)
key = hex.EncodeToString([]byte(key))

result = append(result, key)
}

return result
}

func (s schema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) {
var result []IndexQuery

Expand Down
22 changes: 12 additions & 10 deletions pkg/chunk/series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,23 +372,25 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun
func (c *seriesStore) calculateIndexEntries(from, through model.Time, chunk Chunk) (WriteBatch, []string, error) {
seenIndexEntries := map[string]struct{}{}
entries := []IndexEntry{}
keysToCache := []string{}

metricName := chunk.Metric.Get(labels.MetricName)
if metricName == "" {
return nil, nil, fmt.Errorf("no MetricNameLabel for chunk")
}
keys := c.schema.GetLabelEntryCacheKeys(from, through, chunk.UserID, chunk.Metric)

keys, labelEntries, err := c.schema.GetCacheKeysAndLabelWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey())
if err != nil {
return nil, nil, err
}
_, _, missing := c.writeDedupeCache.Fetch(context.Background(), keys)
if len(missing) != 0 {
labelEntries, err := c.schema.GetLabelWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey())
if err != nil {
return nil, nil, err
// keys and labelEntries are matched in order, but Fetch() may
// return missing keys in any order so check against all of them.
for _, missingKey := range missing {
for i, key := range keys {
if key == missingKey {
entries = append(entries, labelEntries[i]...)
}
}

entries = append(entries, labelEntries...)
keysToCache = missing
}

chunkEntries, err := c.schema.GetChunkWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey())
Expand All @@ -410,5 +412,5 @@ func (c *seriesStore) calculateIndexEntries(from, through model.Time, chunk Chun
}
}

return result, keysToCache, nil
return result, missing, nil
}