diff --git a/go/config/config.go b/go/config/config.go index 313c3ac6b..33cd087fc 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -263,7 +263,7 @@ type Configuration struct { GraphitePollSeconds int // Graphite writes interval. 0 disables. URLPrefix string // URL prefix to run orchestrator on non-root web path, e.g. /orchestrator to put it behind nginx. DiscoveryIgnoreReplicaHostnameFilters []string // Regexp filters to apply to prevent auto-discovering new replicas. Usage: unreachable servers due to firewalls, applications which trigger binlog dumps - DiscoveryIgnoreMasterHostnameFilters []string // Regexp filters to apply to prevent auto-discovering a master. Usage: pointing your master temporarily to replicate seom data from external host + DiscoveryIgnoreMasterHostnameFilters []string // Regexp filters to apply to prevent auto-discovering a master. Usage: pointing your master temporarily to replicate some data from external host DiscoveryIgnoreHostnameFilters []string // Regexp filters to apply to prevent discovering instances of any kind ConsulAddress string // Address where Consul HTTP api is found. Example: 127.0.0.1:8500 ConsulScheme string // Scheme (http or https) for Consul @@ -275,6 +275,9 @@ type Configuration struct { KVClusterMasterPrefix string // Prefix to use for clusters' masters entries in KV stores (internal, consul, ZK), default: "mysql/master" WebMessage string // If provided, will be shown on all web pages below the title bar MaxConcurrentReplicaOperations int // Maximum number of concurrent operations on replicas + EnforceExactSemiSyncReplicas bool // If true, semi-sync replicas will be enabled/disabled to match the wait count in the desired priority order; this applies to LockedSemiSyncMaster and MasterWithTooManySemiSyncReplicas + RecoverLockedSemiSyncMaster bool // If true, orchestrator will recover from a LockedSemiSync state by enabling semi-sync on replicas to match the wait count; this behavior can be overridden by EnforceExactSemiSyncReplicas + ReasonableStaleBinlogCoordinatesSeconds uint // Time to evaluate the LockedSemiSyncHypothesis before triggering the LockedSemiSync analysis; falls back to ReasonableReplicationLagSeconds if not set } // ToJSONString will marshal this configuration as JSON @@ -446,6 +449,9 @@ func newConfiguration() *Configuration { KVClusterMasterPrefix: "mysql/master", WebMessage: "", MaxConcurrentReplicaOperations: 5, + EnforceExactSemiSyncReplicas: false, + RecoverLockedSemiSyncMaster: false, + ReasonableStaleBinlogCoordinatesSeconds: 0, } } diff --git a/go/inst/analysis.go b/go/inst/analysis.go index 1d6ab3692..a4e525b8b 100644 --- a/go/inst/analysis.go +++ b/go/inst/analysis.go @@ -40,6 +40,7 @@ const ( AllMasterReplicasNotReplicatingOrDead = "AllMasterReplicasNotReplicatingOrDead" LockedSemiSyncMasterHypothesis = "LockedSemiSyncMasterHypothesis" LockedSemiSyncMaster = "LockedSemiSyncMaster" + MasterWithTooManySemiSyncReplicas = "MasterWithTooManySemiSyncReplicas" MasterWithoutReplicas = "MasterWithoutReplicas" DeadCoMaster = "DeadCoMaster" DeadCoMasterAndSomeReplicas = "DeadCoMasterAndSomeReplicas" @@ -230,3 +231,10 @@ func (this *ReplicationAnalysis) GetAnalysisInstanceType() AnalysisInstanceType func ValidSecondsFromSeenToLastAttemptedCheck() uint { return config.Config.InstancePollSeconds + config.Config.ReasonableInstanceCheckSeconds } + +func ReasonableStaleBinlogCoordinatesSeconds() uint { + if config.Config.ReasonableStaleBinlogCoordinatesSeconds == 0 { + return uint(config.Config.ReasonableReplicationLagSeconds) + } + return config.Config.ReasonableStaleBinlogCoordinatesSeconds +} diff --git a/go/inst/analysis_dao.go b/go/inst/analysis_dao.go index 4b00281e4..ce1343933 100644 --- a/go/inst/analysis_dao.go +++ b/go/inst/analysis_dao.go @@ -55,7 +55,7 @@ func initializeAnalysisDaoPostConfiguration() { func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) ([]ReplicationAnalysis, error) { result := []ReplicationAnalysis{} - args := sqlutils.Args(config.Config.ReasonableReplicationLagSeconds, ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, clusterName) + args := sqlutils.Args(ReasonableStaleBinlogCoordinatesSeconds(), ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, clusterName) analysisQueryReductionClause := `` if config.Config.ReduceReplicationAnalysisCount { @@ -531,6 +531,10 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) a.Description = "Semi sync master seems to be locked, more samplings needed to validate" } // + } else if config.Config.EnforceExactSemiSyncReplicas && a.IsMaster && a.SemiSyncMasterEnabled && a.SemiSyncMasterStatus && a.SemiSyncMasterWaitForReplicaCount > 0 && a.SemiSyncMasterClients > a.SemiSyncMasterWaitForReplicaCount { + a.Analysis = MasterWithTooManySemiSyncReplicas + a.Description = "Semi sync master has more semi sync replicas than configured" + // } else if a.IsMaster && a.LastCheckValid && a.IsReadOnly && a.CountValidReplicatingReplicas > 0 && config.Config.RecoverNonWriteableMaster { a.Analysis = NoWriteableMasterStructureWarning a.Description = "Master with replicas is read_only" diff --git a/go/inst/instance.go b/go/inst/instance.go index 5b0d4e115..521895eb8 100644 --- a/go/inst/instance.go +++ b/go/inst/instance.go @@ -94,7 +94,7 @@ type Instance struct { HasReplicationCredentials bool ReplicationCredentialsAvailable bool SemiSyncAvailable bool // when both semi sync plugins (master & replica) are loaded - SemiSyncEnforced bool + SemiSyncPriority uint SemiSyncMasterEnabled bool SemiSyncReplicaEnabled bool SemiSyncMasterTimeout uint64 diff --git a/go/inst/instance_dao.go b/go/inst/instance_dao.go index 04e4168d7..c3c2cab8a 100644 --- a/go/inst/instance_dao.go +++ b/go/inst/instance_dao.go @@ -776,7 +776,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, waitGroup.Add(1) go func() { defer waitGroup.Done() - err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncEnforced) + err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncPriority) logReadTopologyInstanceError(instanceKey, "DetectSemiSyncEnforcedQuery", err) }() } @@ -1209,7 +1209,7 @@ func readInstanceRow(m sqlutils.RowMap) *Instance { instance.DataCenter = m.GetString("data_center") instance.Region = m.GetString("region") instance.PhysicalEnvironment = m.GetString("physical_environment") - instance.SemiSyncEnforced = m.GetBool("semi_sync_enforced") + instance.SemiSyncPriority = m.GetUint("semi_sync_enforced") instance.SemiSyncAvailable = m.GetBool("semi_sync_available") instance.SemiSyncMasterEnabled = m.GetBool("semi_sync_master_enabled") instance.SemiSyncMasterTimeout = m.GetUint64("semi_sync_master_timeout") @@ -2610,7 +2610,7 @@ func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bo args = append(args, instance.ReplicationCredentialsAvailable) args = append(args, instance.HasReplicationCredentials) args = append(args, instance.AllowTLS) - args = append(args, instance.SemiSyncEnforced) + args = append(args, instance.SemiSyncPriority) args = append(args, instance.SemiSyncAvailable) args = append(args, instance.SemiSyncMasterEnabled) args = append(args, instance.SemiSyncMasterTimeout) diff --git a/go/inst/instance_topology_dao.go b/go/inst/instance_topology_dao.go index 1631fe8eb..3465b430f 100644 --- a/go/inst/instance_topology_dao.go +++ b/go/inst/instance_topology_dao.go @@ -20,6 +20,7 @@ import ( "context" "database/sql" "fmt" + "sort" "strings" "time" @@ -456,13 +457,9 @@ func StartReplication(instanceKey *InstanceKey) (*Instance, error) { // some replicas (those that must never be promoted) should never ACK. // Note: We assume that replicas use 'skip-slave-start' so they won't // START SLAVE on their own upon restart. - if instance.SemiSyncEnforced { - // Send ACK only from promotable instances. - sendACK := instance.PromotionRule != MustNotPromoteRule - // Always disable master setting, in case we're converting a former master. - if err := EnableSemiSync(instanceKey, false, sendACK); err != nil { - return instance, log.Errore(err) - } + instance, err = MaybeEnableSemiSyncReplica(instance) + if err != nil { + return instance, log.Errore(err) } _, err = ExecInstance(instanceKey, `start slave`) @@ -534,7 +531,7 @@ func WaitForExecBinlogCoordinatesToReach(instanceKey *InstanceKey, coordinates * } } -// StartReplicationUntilMasterCoordinates issuesa START SLAVE UNTIL... statement on given instance +// StartReplicationUntilMasterCoordinates issues a START SLAVE UNTIL... statement on given instance func StartReplicationUntilMasterCoordinates(instanceKey *InstanceKey, masterCoordinates *BinlogCoordinates) (*Instance, error) { instance, err := ReadTopologyInstance(instanceKey) if err != nil { @@ -550,13 +547,9 @@ func StartReplicationUntilMasterCoordinates(instanceKey *InstanceKey, masterCoor log.Infof("Will start replication on %+v until coordinates: %+v", instanceKey, masterCoordinates) - if instance.SemiSyncEnforced { - // Send ACK only from promotable instances. - sendACK := instance.PromotionRule != MustNotPromoteRule - // Always disable master setting, in case we're converting a former master. - if err := EnableSemiSync(instanceKey, false, sendACK); err != nil { - return instance, log.Errore(err) - } + instance, err = MaybeEnableSemiSyncReplica(instance) + if err != nil { + return instance, log.Errore(err) } // MariaDB has a bug: a CHANGE MASTER TO statement does not work properly with prepared statement... :P @@ -584,14 +577,155 @@ func StartReplicationUntilMasterCoordinates(instanceKey *InstanceKey, masterCoor return instance, err } -// EnableSemiSync sets the rpl_semi_sync_(master|replica)_enabled variables -// on a given instance. -func EnableSemiSync(instanceKey *InstanceKey, master, replica bool) error { - log.Infof("instance %+v rpl_semi_sync_master_enabled: %t, rpl_semi_sync_slave_enabled: %t", instanceKey, master, replica) - _, err := ExecInstance(instanceKey, - `set global rpl_semi_sync_master_enabled = ?, global rpl_semi_sync_slave_enabled = ?`, - master, replica) - return err +// MaybeEnableSemiSyncReplica sets the rpl_semi_sync_(master|replica)_enabled variables on a given instance based on the +// config and state of the world. +func MaybeEnableSemiSyncReplica(replicaInstance *Instance) (*Instance, error) { + // Backwards compatible logic: Enable semi-sync if SemiSyncPriority > 0 (formerly SemiSyncEnforced) + // Note that this logic NEVER enables semi-sync if the promotion rule is "must_not". + if !config.Config.EnforceExactSemiSyncReplicas && !config.Config.RecoverLockedSemiSyncMaster { + if replicaInstance.SemiSyncPriority > 0 { + // Send ACK only from promotable instances; always disable master setting, in case we're converting a former master. + enableReplica := replicaInstance.PromotionRule != MustNotPromoteRule + log.Infof("semi-sync: %+v: setting rpl_semi_sync_master_enabled = %t, rpl_semi_sync_slave_enabled = %t (legacy behavior)", &replicaInstance.Key, false, enableReplica) + _, err := ExecInstance(&replicaInstance.Key, `set global rpl_semi_sync_master_enabled = ?, global rpl_semi_sync_slave_enabled = ?`, false, enableReplica) + if err != nil { + return replicaInstance, log.Errore(err) + } + } + return replicaInstance, nil + } + + // New logic: If EnforceExactSemiSyncReplicas or RecoverLockedSemiSyncMaster are set, we enable semi-sync only if the + // given replica instance is in the list of replicas to have semi-sync enabled (according to the priority). + masterInstance, err := ReadTopologyInstance(&replicaInstance.MasterKey) + if err != nil { + return replicaInstance, log.Errore(err) + } + replicas, err := ReadReplicaInstances(&replicaInstance.MasterKey) + if err != nil { + return replicaInstance, log.Errore(err) + } + + possibleSemiSyncReplicas, asyncReplicas, excludedReplicas := ClassifyAndPrioritizeReplicas(replicas, false) + actions := DetermineSemiSyncReplicaActions(possibleSemiSyncReplicas, asyncReplicas, masterInstance.SemiSyncMasterWaitForReplicaCount, masterInstance.SemiSyncMasterClients, config.Config.EnforceExactSemiSyncReplicas) + + log.Debugf("semi-sync: %+v: determining whether to enable rpl_semi_sync_slave_enabled", replicaInstance.Key) + log.Debugf("semi-sync: master = %+v, master semi-sync wait count = %d, master semi-sync replica count = %d", replicaInstance.MasterKey, masterInstance.SemiSyncMasterWaitForReplicaCount, masterInstance.SemiSyncMasterClients) + LogSemiSyncReplicaAnalysis(possibleSemiSyncReplicas, asyncReplicas, excludedReplicas, actions, func(s string, a ...interface{}) { log.Debugf(s, a...) }) + + // TODO This is a little odd. We sometimes get actions for replicas that are not us. If we do not take this action + // TODO then we'll absolutely have a MasterWithTooManySemiSyncReplicas event following this recovery. We could prevent this + // TODO by just executing all actions, but we're in the scope of `replicaInstance` here, so..... idk + for replica, enable := range actions { + if replica.Key.Equals(&replicaInstance.Key) { + log.Infof("semi-sync: %s: setting rpl_semi_sync_slave_enabled: %t", &replicaInstance.Key, enable) + return SetSemiSyncReplica(&replicaInstance.Key, enable) // override "instance", since we re-read it in this function + } + } + + log.Infof("semi-sync: %+v: no action taken", &replicaInstance.Key) + return replicaInstance, nil +} + +// ClassifyAndPrioritizeReplicas takes a list of replica instances and classifies them based on their semi-sync priority, excluding +// replicas that are downtimed or down. It furthermore prioritizes the possible semi-sync replicas based on SemiSyncPriority, PromotionRule +// and hostname (fallback). +func ClassifyAndPrioritizeReplicas(replicas []*Instance, excludeNotReplicatingReplicas bool) (possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, excludedReplicas []*Instance) { + // Filter out downtimed and down replicas + possibleSemiSyncReplicas = make([]*Instance, 0) + asyncReplicas = make([]*Instance, 0) + excludedReplicas = make([]*Instance, 0) + for _, replica := range replicas { + if replica.IsDowntimed || !replica.IsLastCheckValid || (excludeNotReplicatingReplicas && !replica.ReplicaRunning()) { + excludedReplicas = append(excludedReplicas, replica) + } else if replica.SemiSyncPriority == 0 { + asyncReplicas = append(asyncReplicas, replica) + } else { + possibleSemiSyncReplicas = append(possibleSemiSyncReplicas, replica) + } + } + + // Sort replicas by priority, promotion rule and name + sort.Slice(possibleSemiSyncReplicas, func(i, j int) bool { + if possibleSemiSyncReplicas[i].SemiSyncPriority != possibleSemiSyncReplicas[j].SemiSyncPriority { + return possibleSemiSyncReplicas[i].SemiSyncPriority < possibleSemiSyncReplicas[j].SemiSyncPriority + } + if possibleSemiSyncReplicas[i].PromotionRule != possibleSemiSyncReplicas[j].PromotionRule { + return possibleSemiSyncReplicas[i].PromotionRule.BetterThan(possibleSemiSyncReplicas[j].PromotionRule) + } + return strings.Compare(possibleSemiSyncReplicas[i].Key.String(), possibleSemiSyncReplicas[j].Key.String()) < 0 + }) + + return +} + +// DetermineSemiSyncReplicaActions returns a map of replicas for which to change the semi-sync replica setting. +// A value of true indicates semi-sync needs to be enabled, false that it needs to be disabled. +func DetermineSemiSyncReplicaActions(possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, waitCount uint, currentSemiSyncReplicas uint, exactReplicaTopology bool) map[*Instance]bool { + if exactReplicaTopology { + return determineSemiSyncReplicaActionsForExactTopology(possibleSemiSyncReplicas, asyncReplicas, waitCount) + } + return determineSemiSyncReplicaActionsForEnoughTopology(possibleSemiSyncReplicas, waitCount, currentSemiSyncReplicas) +} + +func determineSemiSyncReplicaActionsForExactTopology(possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, waitCount uint) map[*Instance]bool { + actions := make(map[*Instance]bool, 0) // true = enable semi-sync, false = disable semi-sync + for i, replica := range possibleSemiSyncReplicas { + isSemiSyncEnabled := replica.SemiSyncReplicaEnabled + shouldSemiSyncBeEnabled := uint(i) < waitCount + if shouldSemiSyncBeEnabled && !isSemiSyncEnabled { + actions[replica] = true + } else if !shouldSemiSyncBeEnabled && isSemiSyncEnabled { + actions[replica] = false + } + } + for _, replica := range asyncReplicas { + if replica.SemiSyncReplicaEnabled { + actions[replica] = false + } + } + return actions +} + +func determineSemiSyncReplicaActionsForEnoughTopology(possibleSemiSyncReplicas []*Instance, waitCount uint, currentSemiSyncReplicas uint) map[*Instance]bool { + actions := make(map[*Instance]bool, 0) // true = enable semi-sync, false = disable semi-sync + enabled := uint(0) + for _, replica := range possibleSemiSyncReplicas { + if !replica.SemiSyncReplicaEnabled { + actions[replica] = true + enabled++ + } + if enabled == waitCount-currentSemiSyncReplicas { + break + } + } + return actions +} + +// LogSemiSyncReplicaAnalysis outputs the analysis results for a semi-sync analysis using the given log function +func LogSemiSyncReplicaAnalysis(possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, excludedReplicas []*Instance, actions map[*Instance]bool, logf func(s string, a ...interface{})) { + logReplicas("possible semi-sync replicas (in priority order)", possibleSemiSyncReplicas, logf) + logReplicas("always-async replicas", asyncReplicas, logf) + logReplicas("excluded replicas (downtimed/defunct)", excludedReplicas, logf) + if len(actions) > 0 { + logf("semi-sync: suggested actions:") + for replica, enable := range actions { + logf("semi-sync: - %+v: should set semi-sync enabled = %t", replica.Key, enable) + } + } else { + logf("semi-sync: suggested actions: (none)") + } +} + +func logReplicas(description string, replicas []*Instance, logf func(s string, a ...interface{})) { + if len(replicas) > 0 { + logf("semi-sync: %s:", description) + for _, replica := range replicas { + logf("semi-sync: - %s: semi-sync enabled = %t, priority = %d, promotion rule = %s, downtimed = %t, last check = %t, replicating = %t", replica.Key.String(), replica.SemiSyncReplicaEnabled, replica.SemiSyncPriority, replica.PromotionRule, replica.IsDowntimed, replica.IsLastCheckValid, replica.ReplicaRunning()) + } + } else { + logf("semi-sync: %s: (none)", description) + } } // DelayReplication set the replication delay given seconds @@ -1090,10 +1224,8 @@ func SetReadOnly(instanceKey *InstanceKey, readOnly bool) (*Instance, error) { // If async fallback is disallowed, we're responsible for flipping the master // semi-sync switch ON before accepting writes. The setting is off by default. - if instance.SemiSyncEnforced && !readOnly { - // Send ACK only from promotable instances. - sendACK := instance.PromotionRule != MustNotPromoteRule - if err := EnableSemiSync(instanceKey, true, sendACK); err != nil { + if instance.SemiSyncPriority > 0 && !readOnly { // TODO I feel like this flag is a little overloaded: this is used here to determine if the MASTER's semi-sync is enabled... + if _, err := SetSemiSyncMaster(instanceKey, true); err != nil { return instance, log.Errore(err) } } @@ -1111,13 +1243,14 @@ func SetReadOnly(instanceKey *InstanceKey, readOnly bool) (*Instance, error) { } } instance, err = ReadTopologyInstance(instanceKey) + if err != nil { + return instance, log.Errore(err) + } // If we just went read-only, it's safe to flip the master semi-sync switch // OFF, which is the default value so that replicas can make progress. - if instance.SemiSyncEnforced && readOnly { - // Send ACK only from promotable instances. - sendACK := instance.PromotionRule != MustNotPromoteRule - if err := EnableSemiSync(instanceKey, false, sendACK); err != nil { + if instance.SemiSyncPriority > 0 && readOnly { // TODO I feel like this flag is a little overloaded: this is used here to determine if the MASTER's semi-sync is enabled... + if _, err := SetSemiSyncMaster(instanceKey, false); err != nil { return instance, log.Errore(err) } } diff --git a/go/logic/topology_recovery.go b/go/logic/topology_recovery.go index 39a46bfa5..b182cb4dd 100644 --- a/go/logic/topology_recovery.go +++ b/go/logic/topology_recovery.go @@ -1479,14 +1479,97 @@ func checkAndRecoverNonWriteableMaster(analysisEntry inst.ReplicationAnalysis, c // checkAndRecoverLockedSemiSyncMaster func checkAndRecoverLockedSemiSyncMaster(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, true, true) if topologyRecovery == nil { AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another RecoverLockedSemiSyncMaster.", analysisEntry.AnalyzedInstanceKey)) return false, nil, err } + if config.Config.EnforceExactSemiSyncReplicas { + return recoverSemiSyncReplicas(topologyRecovery, analysisEntry, true) + } + if !config.Config.RecoverLockedSemiSyncMaster { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("no action taken to recover locked semi sync master on %+v. Enable RecoverLockedSemiSyncMaster or EnforceExactSemiSyncReplicas change this behavior.", analysisEntry.AnalyzedInstanceKey)) + return false, nil, err + } + return recoverSemiSyncReplicas(topologyRecovery, analysisEntry, false) +} - return false, nil, nil +// checkAndRecoverMasterWithTooManySemiSyncReplicas +func checkAndRecoverMasterWithTooManySemiSyncReplicas(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, true, true) + if topologyRecovery == nil { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another RecoverMasterWithTooManySemiSyncReplicas.", analysisEntry.AnalyzedInstanceKey)) + return false, nil, err + } + return recoverSemiSyncReplicas(topologyRecovery, analysisEntry, true) +} + +func recoverSemiSyncReplicas(topologyRecovery *TopologyRecovery, analysisEntry inst.ReplicationAnalysis, exactReplicaTopology bool) (recoveryAttempted bool, topologyRecoveryOut *TopologyRecovery, err error) { + masterInstance, found, err := inst.ReadInstance(&analysisEntry.AnalyzedInstanceKey) + if !found || err != nil { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: instance not found in recovery %+v.", analysisEntry.AnalyzedInstanceKey)) + return false, topologyRecovery, err + } + + // Read all replicas + replicas, err := inst.ReadReplicaInstances(&analysisEntry.AnalyzedInstanceKey) + if err != nil { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: could not read replica instances for %+v: %s", analysisEntry.AnalyzedInstanceKey, err.Error())) + return false, topologyRecovery, err + } + if len(replicas) == 0 { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: no replicas found for %+v; cannot recover", analysisEntry.AnalyzedInstanceKey)) + return false, topologyRecovery, nil + } + + // Classify and prioritize replicas & figure out which replicas need to be acted upon + possibleSemiSyncReplicas, asyncReplicas, excludedReplicas := inst.ClassifyAndPrioritizeReplicas(replicas, true) + actions := inst.DetermineSemiSyncReplicaActions(possibleSemiSyncReplicas, asyncReplicas, analysisEntry.SemiSyncMasterWaitForReplicaCount, analysisEntry.SemiSyncMasterClients, exactReplicaTopology) + + // Log analysis + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: analysis results for recovery of %+v:", masterInstance.Key)) + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: semi-sync wait count = %d, semi-sync replica count = %d", analysisEntry.SemiSyncMasterWaitForReplicaCount, analysisEntry.SemiSyncMasterClients)) + inst.LogSemiSyncReplicaAnalysis(possibleSemiSyncReplicas, asyncReplicas, excludedReplicas, actions, func(s string, a ...interface{}) { AuditTopologyRecovery(topologyRecovery, fmt.Sprintf(s, a...)) }) + + // Bail out if we cannot succeed + if uint(len(possibleSemiSyncReplicas)) < analysisEntry.SemiSyncMasterWaitForReplicaCount { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: not enough valid live replicas found to recover from %s on %+v.", analysisEntry.AnalysisString(), analysisEntry.AnalyzedInstanceKey)) + return true, topologyRecovery, nil // TODO recoveryAttempted = true; is this correct? what are the implications of this? + } + if len(actions) == 0 { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: cannot determine actions based on possible semi-sync replicas; cannot recover from %s on %+v.", analysisEntry.AnalysisString(), analysisEntry.AnalyzedInstanceKey)) + return true, topologyRecovery, nil // TODO recoveryAttempted = true; is this correct? what are the implications of this? + } + + // Take action + AuditTopologyRecovery(topologyRecovery, "semi-sync: taking actions:") + for replica, enable := range actions { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: - %s: setting rpl_semi_sync_slave_enabled=%t, restarting slave_io thread", replica.Key.String(), enable)) + _, err := inst.SetSemiSyncReplica(&replica.Key, enable) + if err != nil { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: cannot change semi sync on replica %+v.", replica.Key)) + return true, topologyRecovery, nil + } + } + + // Re-read source instance to avoid re-triggering the same condition + // TODO even though we resolve correctly here, we are re-triggering the same analysis until the next polling interval. WHY? + + resolveRecovery(topologyRecovery, masterInstance) + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: recovery complete; success = %t", topologyRecovery.IsSuccessful)) + + return true, topologyRecovery, nil +} + +func logReplicas(topologyRecovery *TopologyRecovery, description string, replicas []*inst.Instance) { + if len(replicas) > 0 { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("%s:", description)) + for _, replica := range replicas { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- %s: semi-sync enabled = %t, priority = %d, promotion rule = %s, downtimed = %t, last check = %t, replicating = %t", replica.Key.String(), replica.SemiSyncReplicaEnabled, replica.SemiSyncPriority, replica.PromotionRule, replica.IsDowntimed, replica.IsLastCheckValid, replica.ReplicaRunning())) + } + } else { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("%s: (none)", description)) + } } // checkAndRecoverGenericProblem is a general-purpose recovery function @@ -1659,6 +1742,8 @@ func getCheckAndRecoverFunction(analysisCode inst.AnalysisCode, analyzedInstance } else { return checkAndRecoverLockedSemiSyncMaster, true } + case inst.MasterWithTooManySemiSyncReplicas: + return checkAndRecoverMasterWithTooManySemiSyncReplicas, true // intermediate master case inst.DeadIntermediateMaster: return checkAndRecoverDeadIntermediateMaster, true