Skip to content

[Experimental] Memberlist singleton and support for multiple codecs #2016

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Feb 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d788adc
Added codec ID, to be used between memberlist clients.
pstibrany Jan 15, 2020
8e4e8d8
Separated memberlist.Client from memberlist.KV.
pstibrany Jan 20, 2020
65b1a61
Replaced custom serialization with protobuf serialization.
pstibrany Jan 20, 2020
d279c92
Codec ID is now part of the message exchanged between memberlist nodes.
pstibrany Jan 21, 2020
80a0c15
Added tests for multi-codecs KV.
pstibrany Jan 21, 2020
fd6f816
Fixed client creation.
pstibrany Jan 21, 2020
13a3b3d
Removed Stop method from kv.Client interface.
pstibrany Jan 21, 2020
bd9f797
Memberlist KV is now top-level component.
pstibrany Jan 21, 2020
1893c53
All codecs are registered to memberlist KV at the beginning.
pstibrany Jan 21, 2020
cd7159c
Report unknown codec errors as invalid messages.
pstibrany Jan 21, 2020
9a07c27
Updated CHANGELOG.md
pstibrany Jan 21, 2020
3c62bbe
Typos
pstibrany Jan 21, 2020
cfc07cd
Comments.
pstibrany Jan 21, 2020
448e70e
Fixes
pstibrany Jan 21, 2020
55d0f04
Added yaml tag to ignore function.
pstibrany Jan 21, 2020
c9d6011
Fixed unintended import reorder.
pstibrany Jan 21, 2020
b510414
Comments.
pstibrany Jan 21, 2020
5133bc8
Comments.
pstibrany Jan 21, 2020
5aae6eb
Return error
pstibrany Jan 22, 2020
fd1fe3f
Updated docs
pstibrany Jan 22, 2020
1d3aa66
Improved comments in LocalState and MergeRemoteState
pstibrany Feb 12, 2020
1be6245
Allow ruler and distributor use memberlist.KV for their internal rings.
pstibrany Feb 12, 2020
de08cc4
Added test to verify that Memberlist works in single-binary mode,
pstibrany Feb 12, 2020
7f05293
Updated CHANGELOG.md, moved to CHANGE because of protocol break.
pstibrany Feb 12, 2020
91fe063
Use "memberlist" in YAML config.
pstibrany Feb 12, 2020
bca87c2
Updated comment.
pstibrany Feb 12, 2020
7dbf90f
Updated comments and messages.
pstibrany Feb 12, 2020
11ff1a5
Fixed config.
pstibrany Feb 12, 2020
fb4f211
Compactor can now also use memberlist KV store.
pstibrany Feb 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [CHANGE] Moved `--store.min-chunk-age` to the Querier config as `--querier.query-store-after`, allowing the store to be skipped during query time if the metrics wouldn't be found. The YAML config option `ingestermaxquerylookback` has been renamed to `query_ingesters_within` to match its CLI flag. #1893
* `--store.min-chunk-age` has been removed
* `--querier.query-store-after` has been added in it's place.
* [CHANGE] Experimental Memberlist KV store can now be used in single-binary Cortex. Attempts to use it previously would fail with panic. This change also breaks existing binary protocol used to exchange gossip messages, so this version will not be able to understand gossiped Ring when used in combination with the previous version of Cortex. Easiest way to upgrade is to shutdown old Cortex installation, and restart it with new version. Incremental rollout works too, but with reduced functionality until all components run the same version. #2016
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
* `--experimental.distributor.user-subring-size`
* [FEATURE] Added flag `-experimental.ruler.enable-api` to enable the ruler api which implements the Prometheus API `/api/v1/rules` and `/api/v1/alerts` endpoints under the configured `-http.prefix`. #1999
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto
pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto
pkg/ring/kv/memberlist/kv.pb.go: pkg/ring/kv/memberlist/kv.proto

all: $(UPTODATE_FILES)
test: protos
Expand Down
46 changes: 17 additions & 29 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ runtime_config:
# File with the configuration that can be updated in runtime.
# CLI flag: -runtime-config.file
[file: <string> | default = ""]

# The memberlist_config configures the Gossip memberlist.
[memberlist: <memberlist_config>]
```

## `server_config`
Expand Down Expand Up @@ -271,10 +274,6 @@ ha_tracker:
# The CLI flags prefix for this block config is: distributor.ha-tracker
[etcd: <etcd_config>]

# The memberlist_config configures the Gossip memberlist.
# The CLI flags prefix for this block config is: distributor.ha-tracker
[memberlist: <memberlist_config>]

multi:
# Primary backend storage used by multi-client.
# CLI flag: -distributor.ha-tracker.multi.primary
Expand Down Expand Up @@ -328,10 +327,6 @@ ring:
# The CLI flags prefix for this block config is: distributor.ring
[etcd: <etcd_config>]

# The memberlist_config configures the Gossip memberlist.
# The CLI flags prefix for this block config is: distributor.ring
[memberlist: <memberlist_config>]

multi:
# Primary backend storage used by multi-client.
# CLI flag: -distributor.ring.multi.primary
Expand Down Expand Up @@ -403,9 +398,6 @@ lifecycler:
# The etcd_config configures the etcd client.
[etcd: <etcd_config>]

# The memberlist_config configures the Gossip memberlist.
[memberlist: <memberlist_config>]

multi:
# Primary backend storage used by multi-client.
# CLI flag: -multi.primary
Expand Down Expand Up @@ -740,10 +732,6 @@ ring:
# The CLI flags prefix for this block config is: ruler.ring
[etcd: <etcd_config>]

# The memberlist_config configures the Gossip memberlist.
# The CLI flags prefix for this block config is: ruler.ring
[memberlist: <memberlist_config>]

multi:
# Primary backend storage used by multi-client.
# CLI flag: -ruler.ring.multi.primary
Expand Down Expand Up @@ -1739,62 +1727,62 @@ The `memberlist_config` configures the Gossip memberlist.

```yaml
# Name of the node in memberlist cluster. Defaults to hostname.
# CLI flag: -<prefix>.memberlist.nodename
# CLI flag: -memberlist.nodename
[node_name: <string> | default = ""]

# The timeout for establishing a connection with a remote node, and for
# read/write operations. Uses memberlist LAN defaults if 0.
# CLI flag: -<prefix>.memberlist.stream-timeout
# CLI flag: -memberlist.stream-timeout
[stream_timeout: <duration> | default = 0s]

# Multiplication factor used when sending out messages (factor * log(N+1)).
# CLI flag: -<prefix>.memberlist.retransmit-factor
# CLI flag: -memberlist.retransmit-factor
[retransmit_factor: <int> | default = 0]

# How often to use pull/push sync. Uses memberlist LAN defaults if 0.
# CLI flag: -<prefix>.memberlist.pullpush-interval
# CLI flag: -memberlist.pullpush-interval
[pull_push_interval: <duration> | default = 0s]

# How often to gossip. Uses memberlist LAN defaults if 0.
# CLI flag: -<prefix>.memberlist.gossip-interval
# CLI flag: -memberlist.gossip-interval
[gossip_interval: <duration> | default = 0s]

# How many nodes to gossip to. Uses memberlist LAN defaults if 0.
# CLI flag: -<prefix>.memberlist.gossip-nodes
# CLI flag: -memberlist.gossip-nodes
[gossip_nodes: <int> | default = 0]

# Other cluster members to join. Can be specified multiple times. Memberlist
# store is EXPERIMENTAL.
# CLI flag: -<prefix>.memberlist.join
# CLI flag: -memberlist.join
[join_members: <list of string> | default = ]

# If this node fails to join memberlist cluster, abort.
# CLI flag: -<prefix>.memberlist.abort-if-join-fails
# CLI flag: -memberlist.abort-if-join-fails
[abort_if_cluster_join_fails: <boolean> | default = true]

# How long to keep LEFT ingesters in the ring.
# CLI flag: -<prefix>.memberlist.left-ingesters-timeout
# CLI flag: -memberlist.left-ingesters-timeout
[left_ingesters_timeout: <duration> | default = 5m0s]

# Timeout for leaving memberlist cluster.
# CLI flag: -<prefix>.memberlist.leave-timeout
# CLI flag: -memberlist.leave-timeout
[leave_timeout: <duration> | default = 5s]

# IP address to listen on for gossip messages. Multiple addresses may be
# specified. Defaults to 0.0.0.0
# CLI flag: -<prefix>.memberlist.bind-addr
# CLI flag: -memberlist.bind-addr
[bind_addr: <list of string> | default = ]

# Port to listen on for gossip messages.
# CLI flag: -<prefix>.memberlist.bind-port
# CLI flag: -memberlist.bind-port
[bind_port: <int> | default = 7946]

# Timeout used when connecting to other nodes to send packet.
# CLI flag: -<prefix>.memberlist.packet-dial-timeout
# CLI flag: -memberlist.packet-dial-timeout
[packet_dial_timeout: <duration> | default = 5s]

# Timeout for writing 'packet' data.
# CLI flag: -<prefix>.memberlist.packet-write-timeout
# CLI flag: -memberlist.packet-write-timeout
[packet_write_timeout: <duration> | default = 5s]
```

Expand Down
12 changes: 6 additions & 6 deletions integration/framework/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *Scenario) StartDynamoDB() error {

func (s *Scenario) StartDistributor(name string, flags map[string]string, image string) error {
if image == "" {
image = getDefaultCortexImage()
image = GetDefaultCortexImage()
}

return s.StartService(NewService(
Expand All @@ -150,7 +150,7 @@ func (s *Scenario) StartDistributor(name string, flags map[string]string, image

func (s *Scenario) StartQuerier(name string, flags map[string]string, image string) error {
if image == "" {
image = getDefaultCortexImage()
image = GetDefaultCortexImage()
}

return s.StartService(NewService(
Expand All @@ -173,7 +173,7 @@ func (s *Scenario) StartQuerier(name string, flags map[string]string, image stri

func (s *Scenario) StartIngester(name string, flags map[string]string, image string) error {
if image == "" {
image = getDefaultCortexImage()
image = GetDefaultCortexImage()
}

return s.StartService(NewService(
Expand Down Expand Up @@ -201,7 +201,7 @@ func (s *Scenario) StartIngester(name string, flags map[string]string, image str

func (s *Scenario) StartTableManager(name string, flags map[string]string, image string) error {
if image == "" {
image = getDefaultCortexImage()
image = GetDefaultCortexImage()
}

return s.StartService(NewService(
Expand Down Expand Up @@ -305,8 +305,8 @@ func existDockerNetwork() (bool, error) {
return strings.TrimSpace(string(out)) != "", nil
}

// getDefaultCortexImage returns the Docker image to use to run Cortex.
func getDefaultCortexImage() string {
// GetDefaultCortexImage returns the Docker image to use to run Cortex.
func GetDefaultCortexImage() string {
// Get the cortex image from the CORTEX_IMAGE env variable,
// falling back to "quay.io/cortexproject/cortex:latest"
if os.Getenv("CORTEX_IMAGE") != "" {
Expand Down
4 changes: 4 additions & 0 deletions integration/framework/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func NewService(
}
}

func (s *Service) SetBackoff(cfg util.BackoffConfig) {
s.retryBackoff = util.NewBackoff(context.Background(), cfg)
}

func (s *Service) Start() (err error) {
// In case of any error, if the container was already created, we
// have to cleanup removing it. We ignore the error of the "docker rm"
Expand Down
89 changes: 89 additions & 0 deletions integration/integration_memberlist_single_binary_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/framework"
"github.com/cortexproject/cortex/pkg/util"
)

func TestSingleBinaryWithMemberlist(t *testing.T) {
s, err := framework.NewScenario()
require.NoError(t, err)
defer s.Shutdown()

// Start dependencies
require.NoError(t, s.StartDynamoDB())
// Look ma, no Consul!
require.NoError(t, s.WaitReady("dynamodb"))

require.NoError(t, startSingleBinary(s, "cortex-1", ""))
require.NoError(t, startSingleBinary(s, "cortex-2", "cortex-1:8000"))
require.NoError(t, startSingleBinary(s, "cortex-3", "cortex-2:8000"))

require.NoError(t, s.WaitReady("cortex-1", "cortex-2", "cortex-3"))

// All three Cortex serves should see each other.
require.NoError(t, s.Service("cortex-1").WaitMetric(80, "memberlist_client_cluster_members_count", 3))
require.NoError(t, s.Service("cortex-2").WaitMetric(80, "memberlist_client_cluster_members_count", 3))
require.NoError(t, s.Service("cortex-3").WaitMetric(80, "memberlist_client_cluster_members_count", 3))

// All Cortex servers should have 512 tokens, altogether 3 * 512
require.NoError(t, s.Service("cortex-1").WaitMetric(80, "cortex_ring_tokens_total", 3*512))
require.NoError(t, s.Service("cortex-2").WaitMetric(80, "cortex_ring_tokens_total", 3*512))
require.NoError(t, s.Service("cortex-3").WaitMetric(80, "cortex_ring_tokens_total", 3*512))

require.NoError(t, s.StopService("cortex-1"))
require.NoError(t, s.Service("cortex-2").WaitMetric(80, "cortex_ring_tokens_total", 2*512))
require.NoError(t, s.Service("cortex-2").WaitMetric(80, "memberlist_client_cluster_members_count", 2))
require.NoError(t, s.Service("cortex-3").WaitMetric(80, "cortex_ring_tokens_total", 2*512))
require.NoError(t, s.Service("cortex-3").WaitMetric(80, "memberlist_client_cluster_members_count", 2))

require.NoError(t, s.StopService("cortex-2"))
require.NoError(t, s.Service("cortex-3").WaitMetric(80, "cortex_ring_tokens_total", 1*512))
require.NoError(t, s.Service("cortex-3").WaitMetric(80, "memberlist_client_cluster_members_count", 1))

require.NoError(t, s.StopService("cortex-3"))
}

func startSingleBinary(s *framework.Scenario, name string, join string) error {
flags := map[string]string{
"-target": "all", // single-binary mode
"-log.level": "warn",
"-ingester.final-sleep": "0s",
"-ingester.join-after": "0s", // join quickly
"-ingester.min-ready-duration": "0s",
"-ingester.concurrent-flushes": "10",
"-ingester.max-transfer-retries": "0", // disable
"-ingester.num-tokens": "512",
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
"-ring.store": "memberlist",
"-memberlist.bind-port": "8000",
}

if join != "" {
flags["-memberlist.join"] = join
}

serv := framework.NewService(
name,
framework.GetDefaultCortexImage(),
framework.NetworkName,
[]int{80, 8000},
nil,
framework.NewCommandWithoutEntrypoint("cortex", framework.BuildArgs(framework.MergeFlags(ChunksStorage, flags))...),
framework.NewReadinessProbe(80, "/ready", 204),
)

backOff := util.BackoffConfig{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 500 * time.Millisecond, // bump max backoff... things take little longer with memberlist
MaxRetries: 100,
}

serv.SetBackoff(backOff)
return s.StartService(serv)
}
14 changes: 9 additions & 5 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -84,6 +85,7 @@ type Config struct {
ConfigStore config_client.Config `yaml:"config_store,omitempty"`
Alertmanager alertmanager.MultitenantAlertmanagerConfig `yaml:"alertmanager,omitempty"`
RuntimeConfig runtimeconfig.ManagerConfig `yaml:"runtime_config,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
}

// RegisterFlags registers flag.
Expand Down Expand Up @@ -119,6 +121,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.ConfigStore.RegisterFlagsWithPrefix("alertmanager.", f)
c.Alertmanager.RegisterFlags(f)
c.RuntimeConfig.RegisterFlags(f)
c.MemberlistKV.RegisterFlags(f, "")

// These don't seem to have a home.
flag.IntVar(&chunk_util.QueryParallelism, "querier.query-parallelism", 100, "Max subqueries run in parallel per higher-level query.")
Expand Down Expand Up @@ -171,11 +174,12 @@ type Cortex struct {
cache cache.Cache
runtimeConfig *runtimeconfig.Manager

ruler *ruler.Ruler
configAPI *api.API
configDB db.DB
alertmanager *alertmanager.MultitenantAlertmanager
compactor *compactor.Compactor
ruler *ruler.Ruler
configAPI *api.API
configDB db.DB
alertmanager *alertmanager.MultitenantAlertmanager
compactor *compactor.Compactor
memberlistKVState *memberlistKVState

// The chunk store that the querier should use to query the long
// term storage. It depends on the storage engine used.
Expand Down
34 changes: 34 additions & 0 deletions pkg/cortex/memberlist_kv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cortex

import (
"sync"

"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
)

// This struct holds state of initialization of memberlist.KV instance.
type memberlistKVState struct {
// config used for initialization
cfg *memberlist.KVConfig

// init function, to avoid multiple initializations.
init sync.Once

// state
kv *memberlist.KV
err error
}

func newMemberlistKVState(cfg *memberlist.KVConfig) *memberlistKVState {
return &memberlistKVState{
cfg: cfg,
}
}

func (kvs *memberlistKVState) getMemberlistKV() (*memberlist.KV, error) {
kvs.init.Do(func() {
kvs.kv, kvs.err = memberlist.NewKV(*kvs.cfg)
})

return kvs.kv, kvs.err
}
Loading