diff --git a/go/config/config.go b/go/config/config.go index 7cdbbf353..e7ddeaa80 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -256,6 +256,7 @@ type Configuration struct { DiscoveryIgnoreReplicaHostnameFilters []string // Regexp filters to apply to prevent auto-discovering new replicas. Usage: unreachable servers due to firewalls, applications which trigger binlog dumps ConsulAddress string // Address where Consul HTTP api is found. Example: 127.0.0.1:8500 ConsulAclToken string // ACL token used to write to Consul KV + ConsulCrossDataCenterDistribution bool // should orchestrator automatically auto-deduce all consul DCs and write KVs in all DCs ZkAddress string // UNSUPPERTED YET. Address where (single or multiple) ZooKeeper servers are found, in `srv1[:port1][,srv2[:port2]...]` format. Default port is 2181. Example: srv-a,srv-b:12181,srv-c KVClusterMasterPrefix string // Prefix to use for clusters' masters entries in KV stores (internal, consul, ZK), default: "mysql/master" WebMessage string // If provided, will be shown on all web pages below the title bar @@ -417,6 +418,7 @@ func newConfiguration() *Configuration { DiscoveryIgnoreReplicaHostnameFilters: []string{}, ConsulAddress: "", ConsulAclToken: "", + ConsulCrossDataCenterDistribution: false, ZkAddress: "", KVClusterMasterPrefix: "mysql/master", WebMessage: "", diff --git a/go/kv/consul.go b/go/kv/consul.go index fab3f470b..f0401caf0 100644 --- a/go/kv/consul.go +++ b/go/kv/consul.go @@ -70,3 +70,25 @@ func (this *consulStore) AddKeyValue(key string, value string) (added bool, err err = this.PutKeyValue(key, value) return (err != nil), err } + +func (this *consulStore) DistributePairs(kvPairs [](*KVPair)) (err error) { + if config.Config.ConsulCrossDataCenterDistribution { + datacenters, err := this.client.Catalog().Datacenters() + if err != nil { + return err + } + consulPairs := [](*consulapi.KVPair){} + for _, kvPair := range kvPairs { + consulPairs = append(consulPairs, &consulapi.KVPair{Key: kvPair.Key, Value: []byte(kvPair.Value)}) + } + for _, datacenter := range datacenters { + writeOptions := &consulapi.WriteOptions{Datacenter: datacenter} + for _, consulPair := range consulPairs { + if _, e := this.client.KV().Put(consulPair, writeOptions); e != nil { + err = e + } + } + } + } + return err +} diff --git a/go/kv/internal.go b/go/kv/internal.go index 85616f0e9..926c4be70 100644 --- a/go/kv/internal.go +++ b/go/kv/internal.go @@ -81,3 +81,7 @@ func (this *internalKVStore) AddKeyValue(key string, value string) (added bool, } return (rowsAffected > 0), nil } + +func (this *internalKVStore) DistributePairs(pairs [](*KVPair)) (err error) { + return nil +} diff --git a/go/kv/kv.go b/go/kv/kv.go index a8a140607..8e2663cb6 100644 --- a/go/kv/kv.go +++ b/go/kv/kv.go @@ -37,8 +37,8 @@ func (this *KVPair) String() string { type KVStore interface { PutKeyValue(key string, value string) (err error) GetKeyValue(key string) (value string, found bool, err error) - AddKeyValue(key string, value string) (added bool, err error) + DistributePairs(pairs [](*KVPair)) (err error) } var kvMutex sync.Mutex @@ -111,3 +111,12 @@ func AddKVPair(kvPair *KVPair) (err error) { } return AddValue(kvPair.Key, kvPair.Value) } + +func DistributePairs(pairs [](*KVPair)) (err error) { + for _, store := range getKVStores() { + if err := store.DistributePairs(pairs); err != nil { + return err + } + } + return nil +} diff --git a/go/kv/zk.go b/go/kv/zk.go index b11e2e5bc..339508e60 100644 --- a/go/kv/zk.go +++ b/go/kv/zk.go @@ -79,3 +79,7 @@ func (this *zkStore) AddKeyValue(key string, value string) (added bool, err erro err = this.PutKeyValue(key, value) return (err != nil), err } + +func (this *zkStore) DistributePairs(pairs [](*KVPair)) (err error) { + return nil +} diff --git a/go/logic/orchestrator.go b/go/logic/orchestrator.go index 08a661bbe..f3c1fc407 100644 --- a/go/logic/orchestrator.go +++ b/go/logic/orchestrator.go @@ -411,16 +411,12 @@ func InjectPseudoGTIDOnWriters() error { // stores are updated via failovers. func SubmitMastersToKvStores(clusterName string, force bool) (kvPairs [](*kv.KVPair), submittedCount int, err error) { kvPairs, err = inst.GetMastersKVPairs(clusterName) + log.Debugf("kv.SubmitMastersToKvStores, clusterName: %s, force: %+v: numPairs: %+v", clusterName, force, len(kvPairs)) if err != nil { return kvPairs, submittedCount, log.Errore(err) } - command := "add-key-value" - applyFunc := kv.AddKVPair - if force { - command = "put-key-value" - applyFunc = kv.PutKVPair - } var selectedError error + var submitKvPairs [](*kv.KVPair) for _, kvPair := range kvPairs { if !force { // !force: Called periodically to auto-populate KV @@ -429,16 +425,21 @@ func SubmitMastersToKvStores(clusterName string, force bool) (kvPairs [](*kv.KVP // Let's not overload database with queries. Let's not overload raft with events. continue } - if v, found, err := kv.GetValue(kvPair.Key); err == nil && found && v == kvPair.Value { + v, found, err := kv.GetValue(kvPair.Key) + if err == nil && found && v == kvPair.Value { // Already has the right value. kvFoundCache.Set(kvPair.Key, true, cache.DefaultExpiration) continue } } + submitKvPairs = append(submitKvPairs, kvPair) + } + log.Debugf("kv.SubmitMastersToKvStores: submitKvPairs: %+v", len(submitKvPairs)) + for _, kvPair := range submitKvPairs { if orcraft.IsRaftEnabled() { - _, err = orcraft.PublishCommand(command, kvPair) + _, err = orcraft.PublishCommand("put-key-value", kvPair) } else { - err = applyFunc(kvPair) + err = kv.PutKVPair(kvPair) } if err == nil { submittedCount++ @@ -446,6 +447,7 @@ func SubmitMastersToKvStores(clusterName string, force bool) (kvPairs [](*kv.KVP selectedError = err } } + kv.DistributePairs(submitKvPairs) return kvPairs, submittedCount, log.Errore(selectedError) } diff --git a/go/logic/topology_recovery.go b/go/logic/topology_recovery.go index d354567d1..68aa95bab 100644 --- a/go/logic/topology_recovery.go +++ b/go/logic/topology_recovery.go @@ -878,7 +878,10 @@ func checkAndRecoverDeadMaster(analysisEntry inst.ReplicationAnalysis, candidate log.Errore(err) } } - + { + err := kv.DistributePairs(kvPairs) + log.Errore(err) + } if !skipProcesses { // Execute post master-failover processes executeProcesses(config.Config.PostMasterFailoverProcesses, "PostMasterFailoverProcesses", topologyRecovery, false)