Skip to content

Support consistent hashing in cache.NewMemcachedClient #1554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## master / unreleased

* [FEATURE] Add option to use jump hashing to load balance requests to memcached #1554

## 0.1.0 / 2019-08-07

* [CHANGE] HA Tracker flags were renamed to provide more clarity #1465
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ require (
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/bradfitz/gomemcache v0.0.0-20170208213004-1952afaa557d
github.com/cenkalti/backoff v1.0.0 // indirect
github.com/cespare/xxhash v1.1.0
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/cznic/ql v1.2.0 // indirect
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fluent/fluent-logger-golang v1.2.1 // indirect
github.com/fsouza/fake-gcs-server v1.3.0
github.com/go-kit/kit v0.8.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo
github.com/evanphx/json-patch v4.1.0+incompatible h1:K1MDoo4AZ4wU0GIU/fPmtZg7VpzLjCxu+UwBD1FvwOc=
github.com/evanphx/json-patch v4.1.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA=
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb h1:IT4JYU7k4ikYg1SCxNI1/Tieq/NFvh6dzLdgi7eu0tM=
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb/go.mod h1:bH6Xx7IW64qjjJq8M2u4dxNaBiDfKK+z/3eGDpXEQhc=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fluent/fluent-logger-golang v1.2.1 h1:CMA+mw2zMiOGEOarZtaqM3GBWT1IVLNncNi0nKELtmU=
github.com/fluent/fluent-logger-golang v1.2.1/go.mod h1:2/HCT/jTy78yGyeNGQLGQsjF3zzzAuy6Xlk6FCMV5eU=
Expand Down
21 changes: 17 additions & 4 deletions pkg/chunk/cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ type MemcachedClient interface {
Set(item *memcache.Item) error
}

type serverSelector interface {
memcache.ServerSelector
SetServers(servers ...string) error
}

// memcachedClient is a memcache client that gets its server list from SRV
// records, and periodically updates that ServerList.
type memcachedClient struct {
*memcache.Client
serverList *memcache.ServerList
serverList serverSelector
hostname string
service string

Expand All @@ -38,6 +43,7 @@ type MemcachedClientConfig struct {
Timeout time.Duration `yaml:"timeout,omitempty"`
MaxIdleConns int `yaml:"max_idle_conns,omitempty"`
UpdateInterval time.Duration `yaml:"update_interval,omitempty"`
ConsistentHash bool `yaml:"consistent_hash,omitempty"`
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
Expand All @@ -47,19 +53,26 @@ func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix, description st
f.IntVar(&cfg.MaxIdleConns, prefix+"memcached.max-idle-conns", 16, description+"Maximum number of idle connections in pool.")
f.DurationVar(&cfg.Timeout, prefix+"memcached.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on memcached requests.")
f.DurationVar(&cfg.UpdateInterval, prefix+"memcached.update-interval", 1*time.Minute, description+"Period with which to poll DNS for memcache servers.")
f.BoolVar(&cfg.ConsistentHash, prefix+"memcached.consistent-hash", false, description+"Use consistent hashing to distribute to memcache servers.")
}

// NewMemcachedClient creates a new MemcacheClient that gets its server list
// from SRV and updates the server list on a regular basis.
func NewMemcachedClient(cfg MemcachedClientConfig) MemcachedClient {
var servers memcache.ServerList
client := memcache.NewFromSelector(&servers)
var selector serverSelector
if cfg.ConsistentHash {
selector = &MemcachedJumpHashSelector{}
} else {
selector = &memcache.ServerList{}
}

client := memcache.NewFromSelector(selector)
client.Timeout = cfg.Timeout
client.MaxIdleConns = cfg.MaxIdleConns

newClient := &memcachedClient{
Client: client,
serverList: &servers,
serverList: selector,
hostname: cfg.Host,
service: cfg.Service,
quit: make(chan struct{}),
Expand Down
131 changes: 131 additions & 0 deletions pkg/chunk/cache/memcached_client_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package cache

import (
"net"
"strings"
"sync"

"github.com/bradfitz/gomemcache/memcache"
"github.com/cespare/xxhash"
"github.com/facette/natsort"
)

// MemcachedJumpHashSelector implements the memcache.ServerSelector
// interface. MemcachedJumpHashSelector utilizes a jump hash to
// distribute keys to servers.
//
// While adding or removing servers only requires 1/N keys to move,
// servers are treated as a stack and can only be pushed/popped.
// Therefore, MemcachedJumpHashSelector works best for servers
// with consistent DNS names where the naturally sorted order
// is predictable.
type MemcachedJumpHashSelector struct {
mu sync.RWMutex
addrs []net.Addr
}

// staticAddr caches the Network() and String() values from
// any net.Addr.
//
// Copied from github.com/bradfitz/gomemcache/selector.go.
type staticAddr struct {
network, str string
}

func newStaticAddr(a net.Addr) net.Addr {
return &staticAddr{
network: a.Network(),
str: a.String(),
}
}

func (a *staticAddr) Network() string { return a.network }
func (a *staticAddr) String() string { return a.str }

// SetServers changes a MemcachedJumpHashSelector's set of servers at
// runtime and is safe for concurrent use by multiple goroutines.
//
// Each server is given equal weight. A server is given more weight
// if it's listed multiple times.
//
// SetServers returns an error if any of the server names fail to
// resolve. No attempt is made to connect to the server. If any
// error occurs, no changes are made to the internal server list.
//
// To minimize the number of rehashes for keys when scaling the
// number of servers in subsequent calls to SetServers, servers
// are stored in natural sort order.
func (s *MemcachedJumpHashSelector) SetServers(servers ...string) error {
sortedServers := make([]string, len(servers))
copy(sortedServers, servers)
natsort.Sort(sortedServers)

naddrs := make([]net.Addr, len(sortedServers))
for i, server := range sortedServers {
if strings.Contains(server, "/") {
addr, err := net.ResolveUnixAddr("unix", server)
if err != nil {
return err
}
naddrs[i] = newStaticAddr(addr)
} else {
tcpAddr, err := net.ResolveTCPAddr("tcp", server)
if err != nil {
return err
}
naddrs[i] = newStaticAddr(tcpAddr)
}
}

s.mu.Lock()
defer s.mu.Unlock()
s.addrs = naddrs
return nil
}

// jumpHash consistently chooses a hash bucket number in the range [0, numBuckets) for the given key.
// numBuckets must be >= 1.
//
// Copied from github.com/dgryski/go-jump/blob/master/jump.go
func jumpHash(key uint64, numBuckets int) int32 {

var b int64 = -1
var j int64

for j < int64(numBuckets) {
b = j
key = key*2862933555777941757 + 1
j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
}

return int32(b)
}

// PickServer returns the server address that a given item
// should be shared onto.
func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if len(s.addrs) == 0 {
return nil, memcache.ErrNoServers
} else if len(s.addrs) == 1 {
return s.addrs[0], nil
}
cs := xxhash.Sum64String(key)
idx := jumpHash(cs, len(s.addrs))
return s.addrs[idx], nil
}

// Each iterates over each server and calls the given function.
// If f returns a non-nil error, iteration will stop and that
// error will be returned.
func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error {
s.mu.RLock()
defer s.mu.RUnlock()
for _, def := range s.addrs {
if err := f(def); err != nil {
return err
}
}
return nil
}
65 changes: 65 additions & 0 deletions pkg/chunk/cache/memcached_client_selector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package cache_test

import (
"fmt"
"testing"

"github.com/bradfitz/gomemcache/memcache"
"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/facette/natsort"
"github.com/stretchr/testify/require"
)

func TestNatSort(t *testing.T) {
// Validate that the order of SRV records returned by a DNS
// lookup for a k8s StatefulSet are ordered as expected when
// a natsort is done.
input := []string{
"memcached-10.memcached.cortex.svc.cluster.local.",
"memcached-1.memcached.cortex.svc.cluster.local.",
"memcached-6.memcached.cortex.svc.cluster.local.",
"memcached-3.memcached.cortex.svc.cluster.local.",
"memcached-25.memcached.cortex.svc.cluster.local.",
}

expected := []string{
"memcached-1.memcached.cortex.svc.cluster.local.",
"memcached-3.memcached.cortex.svc.cluster.local.",
"memcached-6.memcached.cortex.svc.cluster.local.",
"memcached-10.memcached.cortex.svc.cluster.local.",
"memcached-25.memcached.cortex.svc.cluster.local.",
}

natsort.Sort(input)
require.Equal(t, expected, input)
}

func TestMemcachedJumpHashSelector_PickSever(t *testing.T) {
s := cache.MemcachedJumpHashSelector{}
err := s.SetServers("google.com:80", "microsoft.com:80", "duckduckgo.com:80")
require.NoError(t, err)

// We store the string representation instead of the net.Addr
// to make sure different IPs were discovered during SetServers
distribution := make(map[string]int)

for i := 0; i < 100; i++ {
key := fmt.Sprintf("key-%d", i)
addr, err := s.PickServer(key)
require.NoError(t, err)
distribution[addr.String()]++
}

// All of the servers should have been returned at least
// once
require.Len(t, distribution, 3)
for _, v := range distribution {
require.NotZero(t, v)
}
}

func TestMemcachedJumpHashSelector_PickSever_ErrNoServers(t *testing.T) {
s := cache.MemcachedJumpHashSelector{}
_, err := s.PickServer("foo")
require.Error(t, memcache.ErrNoServers, err)
}
29 changes: 29 additions & 0 deletions vendor/github.com/facette/natsort/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading