Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
* [ENHANCEMENT] Add `-cassandra.reconnect-interval` to allow specifying the reconnect interval to a Cassandra server that has been marked `DOWN` by the gocql driver. Also change the default value of the reconnect interval from `60s` to `1s`. #2687
* [ENHANCEMENT] Experimental TSDB: Applied a jitter to the period bucket scans in order to better distribute bucket operations over the time and increase the probability of hitting the shared cache (if configured). #2693
* [ENHANCEMENT] Experimental TSDB: Series limit per user and per metric now work in TSDB blocks. #2676
* [ENHANCEMENT] Experimental Memberlist: Added ability to periodically rejoin the memberlist cluster. #2724
* [BUGFIX] Ruler: Ensure temporary rule files with special characters are properly mapped and cleaned up. #2506
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2324,6 +2324,15 @@ The `memberlist_config` configures the Gossip memberlist.
# CLI flag: -memberlist.abort-if-join-fails
[abort_if_cluster_join_fails: <boolean> | default = true]

# If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix
# the cluster split issue, and is harmless otherwise. For example when using
# only few components as a seed nodes (via -memberlist.join), then it's
# recommended to use rejoin. If -memberlist.join points to dynamic service that
# resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin
# is not needed.
# CLI flag: -memberlist.rejoin-interval
[rejoin_interval: <duration> | default = 0s]

# How long to keep LEFT ingesters in the ring.
# CLI flag: -memberlist.left-ingesters-timeout
[left_ingesters_timeout: <duration> | default = 5m]
Expand Down
27 changes: 25 additions & 2 deletions pkg/ring/kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ type KVConfig struct {
MaxJoinBackoff time.Duration `yaml:"max_join_backoff"`
MaxJoinRetries int `yaml:"max_join_retries"`
AbortIfJoinFails bool `yaml:"abort_if_cluster_join_fails"`
RejoinInterval time.Duration `yaml:"rejoin_interval"`

// Remove LEFT ingesters from ring after this timeout.
LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout"`
Expand Down Expand Up @@ -168,6 +169,7 @@ func (cfg *KVConfig) RegisterFlags(f *flag.FlagSet, prefix string) {
f.DurationVar(&cfg.MaxJoinBackoff, prefix+"memberlist.max-join-backoff", 1*time.Minute, "Max backoff duration to join other cluster members.")
f.IntVar(&cfg.MaxJoinRetries, prefix+"memberlist.max-join-retries", 10, "Max number of retries to join other cluster members.")
f.BoolVar(&cfg.AbortIfJoinFails, prefix+"memberlist.abort-if-join-fails", true, "If this node fails to join memberlist cluster, abort.")
f.DurationVar(&cfg.RejoinInterval, prefix+"memberlist.rejoin-interval", 0, "If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix the cluster split issue, and is harmless otherwise. For example when using only few components as a seed nodes (via -memberlist.join), then it's recommended to use rejoin. If -memberlist.join points to dynamic service that resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin is not needed.")
f.DurationVar(&cfg.LeftIngestersTimeout, prefix+"memberlist.left-ingesters-timeout", 5*time.Minute, "How long to keep LEFT ingesters in the ring.")
f.DurationVar(&cfg.LeaveTimeout, prefix+"memberlist.leave-timeout", 5*time.Second, "Timeout for leaving memberlist cluster.")
f.DurationVar(&cfg.GossipInterval, prefix+"memberlist.gossip-interval", 0, "How often to gossip. Uses memberlist LAN defaults if 0.")
Expand Down Expand Up @@ -388,8 +390,29 @@ func (m *KV) running(ctx context.Context) error {
}
}

<-ctx.Done()
return nil
var tickerChan <-chan time.Time = nil
if m.cfg.RejoinInterval > 0 {
t := time.NewTicker(m.cfg.RejoinInterval)
defer t.Stop()

tickerChan = t.C
}

for {
select {
case <-tickerChan:
reached, err := m.memberlist.Join(m.cfg.JoinMembers)
if err == nil {
level.Info(util.Logger).Log("msg", "re-joined memberlist cluster", "reached_nodes", reached)
} else {
// Don't report error from rejoin, otherwise KV service would be stopped completely.
level.Warn(util.Logger).Log("msg", "re-joining memberlist cluster failed", "err", err)
}

case <-ctx.Done():
return nil
}
}
}

// GetCodec returns codec for given ID or nil.
Expand Down
56 changes: 55 additions & 1 deletion pkg/ring/kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@ import (
"math"
"math/rand"
"net"
"os"
"sort"
"sync"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/test"
)

const ACTIVE = 1
Expand Down Expand Up @@ -760,7 +764,7 @@ func TestMemberlistFailsToJoin(t *testing.T) {
func getFreePorts(count int) ([]int, error) {
var ports []int
for i := 0; i < count; i++ {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -980,3 +984,53 @@ func TestGenerateRandomSuffix(t *testing.T) {
require.NotEqual(t, h1, h2)
require.NotEqual(t, h2, h3)
}

func TestRejoin(t *testing.T) {
util.Logger = log.NewLogfmtLogger(os.Stdout)

ports, err := getFreePorts(2)
require.NoError(t, err)

cfg1 := KVConfig{
TCPTransport: TCPTransportConfig{
BindAddrs: []string{"localhost"},
BindPort: ports[0],
},

RandomizeNodeName: true,
Codecs: []codec.Codec{dataCodec{}},
AbortIfJoinFails: false,
}

cfg2 := cfg1
cfg2.TCPTransport.BindPort = ports[1]
cfg2.JoinMembers = []string{fmt.Sprintf("localhost:%d", ports[0])}
cfg2.RejoinInterval = 1 * time.Second

mkv1 := NewKV(cfg1)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1))
defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck

mkv2 := NewKV(cfg2)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2))
defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck

membersFunc := func() interface{} {
return mkv2.memberlist.NumMembers()
}

test.Poll(t, 5*time.Second, 2, membersFunc)

// Shutdown first KV
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), mkv1))

// Second KV should see single member now.
test.Poll(t, 5*time.Second, 1, membersFunc)

// Let's start first KV again. It is not configured to join the cluster, but KV2 is rejoining.
mkv1 = NewKV(cfg1)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1))
defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck

test.Poll(t, 5*time.Second, 2, membersFunc)
}