From 9fe78ddd3d244af48cb213775687bdacd11cb1a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 27 Nov 2019 12:37:11 +0100 Subject: [PATCH 1/5] Log advertised address. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/kv/memberlist/tcp_transport.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/ring/kv/memberlist/tcp_transport.go b/pkg/ring/kv/memberlist/tcp_transport.go index 021bcf1d6c5..f8348c4f65f 100644 --- a/pkg/ring/kv/memberlist/tcp_transport.go +++ b/pkg/ring/kv/memberlist/tcp_transport.go @@ -351,6 +351,7 @@ func (t *TCPTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, err advertisePort = t.GetAutoBindPort() } + level.Debug(util.Logger).Log("msg", "FinalAdvertiseAddr", "advertiseAddr", advertiseAddr.String(), "advertisePort", advertisePort) return advertiseAddr, advertisePort, nil } From c6d517d6e17869b18bb92a754f4ac1eab4544391 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 27 Nov 2019 13:10:23 +0100 Subject: [PATCH 2/5] Expose probe timeout and interval from memberlist settings. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/kv/memberlist/memberlist_client.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 666c1f87251..688768ee257 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -35,6 +35,8 @@ type Config struct { PushPullInterval time.Duration `yaml:"pull_push_interval"` GossipInterval time.Duration `yaml:"gossip_interval"` GossipNodes int `yaml:"gossip_nodes"` + ProbeInterval time.Duration + ProbeTimeout time.Duration // List of members to join JoinMembers flagext.StringSlice `yaml:"join_members"` @@ -65,6 +67,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.GossipInterval, prefix+"memberlist.gossip-interval", 0, "How often to gossip. Uses memberlist LAN defaults if 0.") f.IntVar(&cfg.GossipNodes, prefix+"memberlist.gossip-nodes", 0, "How many nodes to gossip to. Uses memberlist LAN defaults if 0.") f.DurationVar(&cfg.PushPullInterval, prefix+"memberlist.pullpush-interval", 0, "How often to use pull/push sync. Uses memberlist LAN defaults if 0.") + f.DurationVar(&cfg.ProbeInterval, prefix+"memberlist.probe-interval", 0, "How often to probe random nodes. Uses memberlist LAN defaults if 0.") + f.DurationVar(&cfg.ProbeTimeout, prefix+"memberlist.probe-timeout", 0, "Timeout to wait for an ack from a probed node before assuming it is unhealthy.") cfg.TCPTransport.RegisterFlags(f, prefix) } @@ -160,6 +164,12 @@ func NewMemberlistClient(cfg Config, codec codec.Codec) (*Client, error) { if cfg.NodeName != "" { mlCfg.Name = cfg.NodeName } + if cfg.ProbeInterval != 0 { + mlCfg.ProbeInterval = cfg.ProbeInterval + } + if cfg.ProbeTimeout != 0 { + mlCfg.ProbeTimeout = cfg.ProbeTimeout + } mlCfg.LogOutput = newMemberlistLoggerAdapter(util.Logger, false) mlCfg.Transport = tr From e31f7b800a24d94b2b317eada494d983355431fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 27 Nov 2019 14:00:32 +0100 Subject: [PATCH 3/5] Use advertised addr when sending packets. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/kv/memberlist/tcp_transport.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/pkg/ring/kv/memberlist/tcp_transport.go b/pkg/ring/kv/memberlist/tcp_transport.go index f8348c4f65f..06b67c54ebe 100644 --- a/pkg/ring/kv/memberlist/tcp_transport.go +++ b/pkg/ring/kv/memberlist/tcp_transport.go @@ -81,6 +81,9 @@ type TCPTransport struct { shutdown int32 + advertiseMu sync.RWMutex + advertiseAddr string + // metrics incomingStreams prometheus.Counter outgoingStreams prometheus.Counter @@ -352,9 +355,24 @@ func (t *TCPTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, err } level.Debug(util.Logger).Log("msg", "FinalAdvertiseAddr", "advertiseAddr", advertiseAddr.String(), "advertisePort", advertisePort) + + t.setAdvertisedAddr(advertiseAddr, advertisePort) return advertiseAddr, advertisePort, nil } +func (t *TCPTransport) setAdvertisedAddr(advertiseAddr net.IP, advertisePort int) { + t.advertiseMu.Lock() + defer t.advertiseMu.Unlock() + addr := net.TCPAddr{IP: advertiseAddr, Port: advertisePort} + t.advertiseAddr = addr.String() +} + +func (t *TCPTransport) getAdvertisedAddr() string { + t.advertiseMu.RLock() + defer t.advertiseMu.RUnlock() + return t.advertiseAddr +} + // WriteTo is a packet-oriented interface that fires off the given // payload to the given address. func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) { @@ -405,7 +423,11 @@ func (t *TCPTransport) writeTo(b []byte, addr string) error { // We need to send our address to the other side, otherwise other side can only see IP and port from TCP header. // But that doesn't match our node address (source port is assigned automatically), which confuses memberlist. // We will announce first listener's address as our address. This is what memberlist's net_transport.go does as well. - ourAddr := t.tcpListeners[0].Addr().String() + // ourAddr := t.tcpListeners[0].Addr().String() + ourAddr := t.getAdvertisedAddr() + if ourAddr == "" { + ourAddr = t.tcpListeners[0].Addr().String() + } if len(ourAddr) > 255 { return fmt.Errorf("local address too long") } From e48d29f6351c5a0d863f23f5e3c1c258ca0332ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 27 Nov 2019 15:38:06 +0100 Subject: [PATCH 4/5] Removed probe interval and timeout. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/kv/memberlist/memberlist_client.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 688768ee257..666c1f87251 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -35,8 +35,6 @@ type Config struct { PushPullInterval time.Duration `yaml:"pull_push_interval"` GossipInterval time.Duration `yaml:"gossip_interval"` GossipNodes int `yaml:"gossip_nodes"` - ProbeInterval time.Duration - ProbeTimeout time.Duration // List of members to join JoinMembers flagext.StringSlice `yaml:"join_members"` @@ -67,8 +65,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.GossipInterval, prefix+"memberlist.gossip-interval", 0, "How often to gossip. Uses memberlist LAN defaults if 0.") f.IntVar(&cfg.GossipNodes, prefix+"memberlist.gossip-nodes", 0, "How many nodes to gossip to. Uses memberlist LAN defaults if 0.") f.DurationVar(&cfg.PushPullInterval, prefix+"memberlist.pullpush-interval", 0, "How often to use pull/push sync. Uses memberlist LAN defaults if 0.") - f.DurationVar(&cfg.ProbeInterval, prefix+"memberlist.probe-interval", 0, "How often to probe random nodes. Uses memberlist LAN defaults if 0.") - f.DurationVar(&cfg.ProbeTimeout, prefix+"memberlist.probe-timeout", 0, "Timeout to wait for an ack from a probed node before assuming it is unhealthy.") cfg.TCPTransport.RegisterFlags(f, prefix) } @@ -164,12 +160,6 @@ func NewMemberlistClient(cfg Config, codec codec.Codec) (*Client, error) { if cfg.NodeName != "" { mlCfg.Name = cfg.NodeName } - if cfg.ProbeInterval != 0 { - mlCfg.ProbeInterval = cfg.ProbeInterval - } - if cfg.ProbeTimeout != 0 { - mlCfg.ProbeTimeout = cfg.ProbeTimeout - } mlCfg.LogOutput = newMemberlistLoggerAdapter(util.Logger, false) mlCfg.Transport = tr From 9a1460e6cf69af0a8eb7286ef53e743e0576563e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 29 Nov 2019 15:11:25 +0100 Subject: [PATCH 5/5] Removed unused code, fixed comment. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/kv/memberlist/tcp_transport.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/ring/kv/memberlist/tcp_transport.go b/pkg/ring/kv/memberlist/tcp_transport.go index 06b67c54ebe..69a91fec273 100644 --- a/pkg/ring/kv/memberlist/tcp_transport.go +++ b/pkg/ring/kv/memberlist/tcp_transport.go @@ -421,13 +421,10 @@ func (t *TCPTransport) writeTo(b []byte, addr string) error { buf.WriteByte(byte(packet)) // We need to send our address to the other side, otherwise other side can only see IP and port from TCP header. - // But that doesn't match our node address (source port is assigned automatically), which confuses memberlist. - // We will announce first listener's address as our address. This is what memberlist's net_transport.go does as well. - // ourAddr := t.tcpListeners[0].Addr().String() + // But that doesn't match our node address (new TCP connection has new random port), which confuses memberlist. + // So we send our advertised address, so that memberlist on the receiving side can match it with correct node. + // This seems to be important for node probes (pings) done by memberlist. ourAddr := t.getAdvertisedAddr() - if ourAddr == "" { - ourAddr = t.tcpListeners[0].Addr().String() - } if len(ourAddr) > 255 { return fmt.Errorf("local address too long") }