Skip to content

Memberlist integration test fix #2127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ require (
github.com/thanos-io/thanos v0.8.1-0.20200109203923-552ffa4c1a0d
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/uber-go/atomic v1.4.0
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1
github.com/weaveworks/common v0.0.0-20200201141823-27e183090ab1
go.etcd.io/bbolt v1.3.3
go.etcd.io/etcd v0.0.0-20190709142735-eb7dd97135a5
go.uber.org/atomic v1.5.0
golang.org/x/net v0.0.0-20191112182307-2180aed22343
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/api v0.14.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,6 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/uber-go/atomic v1.4.0 h1:yOuPqEq4ovnhEjpHmfFwsqBXDYbQeT6Nb0bwD6XnD5o=
github.com/uber-go/atomic v1.4.0/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-client-go v2.20.1+incompatible h1:HgqpYBng0n7tLJIlyT4kPCIv5XgCsF+kai1NnnrJzEU=
github.com/uber/jaeger-client-go v2.20.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
Expand Down
1 change: 1 addition & 0 deletions integration/integration_memberlist_single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func startSingleBinary(s *framework.Scenario, name string, join string) error {
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
"-ring.store": "memberlist",
"-memberlist.bind-port": "8000",
"-memberlist.pullpush-interval": "3s", // speed up state convergence to make test faster and avoid flakiness
}

if join != "" {
Expand Down
62 changes: 44 additions & 18 deletions pkg/ring/kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/hashicorp/memberlist"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -120,6 +121,9 @@ type KV struct {
memberlist *memberlist.Memberlist
broadcasts *memberlist.TransmitLimitedQueue

// Disabled on Stop()
casBroadcastsEnabled *atomic.Bool

// KV Store.
storeMu sync.Mutex
store map[string]valueDesc
Expand Down Expand Up @@ -224,42 +228,43 @@ func NewKV(cfg KVConfig) (*KV, error) {
// As we don't use UDP for sending packets, we can use higher value here.
mlCfg.UDPBufferSize = 10 * 1024 * 1024

memberlistClient := &KV{
cfg: cfg,
store: make(map[string]valueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
mlkv := &KV{
cfg: cfg,
store: make(map[string]valueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
casBroadcastsEnabled: atomic.NewBool(true),
}

mlCfg.Delegate = memberlistClient
mlCfg.Delegate = mlkv

list, err := memberlist.Create(mlCfg)
if err != nil {
return nil, fmt.Errorf("failed to create memberlist: %v", err)
}

// finish delegate initialization
memberlistClient.memberlist = list
memberlistClient.broadcasts = &memberlist.TransmitLimitedQueue{
mlkv.memberlist = list
mlkv.broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: list.NumMembers,
RetransmitMult: cfg.RetransmitMult,
}

// Almost ready...
memberlistClient.createAndRegisterMetrics()
mlkv.createAndRegisterMetrics()

for _, c := range cfg.Codecs {
memberlistClient.codecs[c.CodecID()] = c
mlkv.codecs[c.CodecID()] = c
}

// Join the cluster
if len(cfg.JoinMembers) > 0 {
reached, err := memberlistClient.JoinMembers(cfg.JoinMembers)
reached, err := mlkv.JoinMembers(cfg.JoinMembers)
if err != nil && cfg.AbortIfJoinFails {
_ = memberlistClient.memberlist.Shutdown()
_ = mlkv.memberlist.Shutdown()
return nil, err
}

Expand All @@ -270,7 +275,7 @@ func NewKV(cfg KVConfig) (*KV, error) {
}
}

return memberlistClient, nil
return mlkv, nil
}

// GetCodec returns codec for given ID or nil.
Expand All @@ -294,7 +299,23 @@ func (m *KV) JoinMembers(members []string) (int, error) {
func (m *KV) Stop() {
level.Info(util.Logger).Log("msg", "leaving memberlist cluster")

// TODO: should we empty our broadcast queue before leaving? That would make sure that we have sent out everything we wanted.
m.casBroadcastsEnabled.Store(false)

// Wait until broadcast queue is empty, but don't wait for too long.
// Also don't wait if there is just one node left.
// Problem is that broadcast queue is also filled up by state changes received from other nodes,
// so it may never be empty in a busy cluster. However, we generally only care about messages
// generated on this node via CAS, and those are disabled now (via casBroadcastsEnabled), and should be able
// to get out in this timeout.

waitTimeout := time.Now().Add(10 * time.Second)
for m.broadcasts.NumQueued() > 0 && m.memberlist.NumMembers() > 1 && time.Now().Before(waitTimeout) {
time.Sleep(250 * time.Millisecond)
}

if cnt := m.broadcasts.NumQueued(); cnt > 0 {
level.Warn(util.Logger).Log("msg", "broadcast messages left in queue", "count", cnt, "nodes", m.memberlist.NumMembers())
}

err := m.memberlist.Leave(m.cfg.LeaveTimeout)
if err != nil {
Expand Down Expand Up @@ -524,7 +545,12 @@ outer:
if change != nil {
m.casSuccesses.Inc()
m.notifyWatchers(key)
m.broadcastNewValue(key, change, newver, codec)

if m.casBroadcastsEnabled.Load() {
m.broadcastNewValue(key, change, newver, codec)
} else {
level.Warn(util.Logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key)
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/kv/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/util"

"github.com/go-kit/kit/log/level"
"github.com/uber-go/atomic"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/runtimeconfig/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"
"go.uber.org/atomic"
"gopkg.in/yaml.v2"
)

Expand Down
15 changes: 0 additions & 15 deletions vendor/github.com/uber-go/atomic/.codecov.yml

This file was deleted.

11 changes: 0 additions & 11 deletions vendor/github.com/uber-go/atomic/.gitignore

This file was deleted.

27 changes: 0 additions & 27 deletions vendor/github.com/uber-go/atomic/.travis.yml

This file was deleted.

19 changes: 0 additions & 19 deletions vendor/github.com/uber-go/atomic/LICENSE.txt

This file was deleted.

51 changes: 0 additions & 51 deletions vendor/github.com/uber-go/atomic/Makefile

This file was deleted.

36 changes: 0 additions & 36 deletions vendor/github.com/uber-go/atomic/README.md

This file was deleted.

Loading