Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go/inst/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
AllMasterReplicasNotReplicatingOrDead = "AllMasterReplicasNotReplicatingOrDead"
LockedSemiSyncMasterHypothesis = "LockedSemiSyncMasterHypothesis"
LockedSemiSyncMaster = "LockedSemiSyncMaster"
MasterWithTooManySemiSyncReplicas = "MasterWithTooManySemiSyncReplicas"
MasterWithIncorrectSemiSyncReplicas = "MasterWithIncorrectSemiSyncReplicas"
MasterWithoutReplicas = "MasterWithoutReplicas"
DeadCoMaster = "DeadCoMaster"
DeadCoMasterAndSomeReplicas = "DeadCoMasterAndSomeReplicas"
Expand Down Expand Up @@ -154,6 +154,7 @@ type ReplicationAnalysis struct {
SemiSyncMasterStatus bool
SemiSyncMasterWaitForReplicaCount uint
SemiSyncMasterClients uint
SemiSyncReplicaTopologyValid bool
CountSemiSyncReplicasEnabled uint
CountLoggingReplicas uint
CountStatementBasedLoggingReplicas uint
Expand Down
90 changes: 83 additions & 7 deletions go/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package inst
import (
"fmt"
"regexp"
"strconv"
"strings"
"time"

"github.com/openark/orchestrator/go/config"
Expand Down Expand Up @@ -55,7 +57,7 @@ func initializeAnalysisDaoPostConfiguration() {
func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) ([]ReplicationAnalysis, error) {
result := []ReplicationAnalysis{}

args := sqlutils.Args(ReasonableLockedSemiSyncMasterSeconds(), ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, clusterName)
args := sqlutils.Args(ReasonableLockedSemiSyncMasterSeconds(), ValidSecondsFromSeenToLastAttemptedCheck(), ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, clusterName)
analysisQueryReductionClause := ``

if config.Config.ReduceReplicationAnalysisCount {
Expand Down Expand Up @@ -222,7 +224,20 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
':',
replica_instance.Port
)
) as slave_hosts,
) as replicas,
GROUP_CONCAT(
concat(
replica_instance.Hostname, ':', replica_instance.Port, ',',
replica_instance.semi_sync_replica_enabled, ',',
replica_instance.semi_sync_enforced, ',',
replica_downtime.downtime_active is not null and ifnull(replica_downtime.end_timestamp, now()) > now(), ',',
replica_instance.last_checked <= replica_instance.last_seen and replica_instance.last_attempted_check <= replica_instance.last_seen + interval ? second, ',',
replica_candidate_instance.promotion_rule, ',',
replica_instance.replication_sql_thread_state, ',',
replica_instance.replication_io_thread_state
)
separator ' '
) as replica_details,
MIN(
master_instance.slave_sql_running = 1
AND master_instance.slave_io_running = 0
Expand Down Expand Up @@ -371,6 +386,10 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
) = replica_instance.master_host
AND master_instance.port = replica_instance.master_port
)
LEFT JOIN candidate_database_instance as replica_candidate_instance ON (
replica_instance.hostname = replica_candidate_instance.hostname
AND replica_instance.port = replica_candidate_instance.port
)
LEFT JOIN database_instance_maintenance ON (
master_instance.hostname = database_instance_maintenance.hostname
AND master_instance.port = database_instance_maintenance.port
Expand Down Expand Up @@ -452,7 +471,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
a.ClusterDetails.ReadRecoveryInfo()

a.Replicas = *NewInstanceKeyMap()
a.Replicas.ReadCommaDelimitedList(m.GetString("slave_hosts"))
a.Replicas.ReadCommaDelimitedList(m.GetString("replicas"))

countValidOracleGTIDReplicas := m.GetUint("count_valid_oracle_gtid_replicas")
a.OracleGTIDImmediateTopology = countValidOracleGTIDReplicas == a.CountValidReplicas && a.CountValidReplicas > 0
Expand All @@ -467,6 +486,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
// countValidSemiSyncReplicasEnabled := m.GetUint("count_valid_semi_sync_replicas")
a.SemiSyncMasterWaitForReplicaCount = m.GetUint("semi_sync_master_wait_for_slave_count")
a.SemiSyncMasterClients = m.GetUint("semi_sync_master_clients")
a.SemiSyncReplicaTopologyValid = isSemiSyncReplicaTopologyValid(m.GetString("replica_details"), &a.AnalyzedInstanceKey, a.SemiSyncMasterWaitForReplicaCount, a.SemiSyncMasterClients)

a.MinReplicaGTIDMode = m.GetString("min_replica_gtid_mode")
a.MaxReplicaGTIDMode = m.GetString("max_replica_gtid_mode")
Expand Down Expand Up @@ -522,6 +542,10 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
a.Analysis = UnreachableMaster
a.Description = "Master cannot be reached by orchestrator but it has replicating replicas; possibly a network/host issue"
//
} else if config.Config.EnforceExactSemiSyncReplicas && a.IsMaster && a.SemiSyncMasterEnabled && a.SemiSyncMasterStatus && (!a.SemiSyncReplicaTopologyValid || (a.SemiSyncMasterWaitForReplicaCount > 0 && a.SemiSyncMasterClients != a.SemiSyncMasterWaitForReplicaCount)) {
a.Analysis = MasterWithIncorrectSemiSyncReplicas
a.Description = "Semi sync master has more incorrect semi sync replicas configured"
//
} else if a.IsMaster && a.SemiSyncMasterEnabled && a.SemiSyncMasterStatus && a.SemiSyncMasterWaitForReplicaCount > 0 && a.SemiSyncMasterClients < a.SemiSyncMasterWaitForReplicaCount {
if isStaleBinlogCoordinates {
a.Analysis = LockedSemiSyncMaster
Expand All @@ -531,10 +555,6 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
a.Description = "Semi sync master seems to be locked, more samplings needed to validate"
}
//
} else if config.Config.EnforceExactSemiSyncReplicas && a.IsMaster && a.SemiSyncMasterEnabled && a.SemiSyncMasterStatus && a.SemiSyncMasterWaitForReplicaCount > 0 && a.SemiSyncMasterClients > a.SemiSyncMasterWaitForReplicaCount {
a.Analysis = MasterWithTooManySemiSyncReplicas
a.Description = "Semi sync master has more semi sync replicas than configured"
//
} else if a.IsMaster && a.LastCheckValid && a.IsReadOnly && a.CountValidReplicatingReplicas > 0 && config.Config.RecoverNonWriteableMaster {
a.Analysis = NoWriteableMasterStructureWarning
a.Description = "Master with replicas is read_only"
Expand Down Expand Up @@ -725,6 +745,62 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
return result, log.Errore(err)
}

func isSemiSyncReplicaTopologyValid(replicaDetails string, masterKey *InstanceKey, waitCount uint, currentSemiSyncReplicas uint) bool {
if !config.Config.EnforceExactSemiSyncReplicas {
return true // don't bother if we won't correct the semi-sync topology anyway
}

replicas, err := parseReplicaDetails(replicaDetails, masterKey)
if err != nil {
log.Warningf("semi-sync: unable to parse replica details: %s", err.Error())
return true // fallback to "everything's fine"
}
possibleSemiSyncReplicas, asyncReplicas, _ := ClassifyAndPrioritizeReplicas(replicas, nil)
actions := DetermineSemiSyncReplicaActions(possibleSemiSyncReplicas, asyncReplicas, waitCount, currentSemiSyncReplicas, true)
return len(actions) == 0
}

// parseReplicaDetails parses the space-separated replica details returned by the giant query (see above)
func parseReplicaDetails(replicaDetails string, masterKey *InstanceKey) ([]*Instance, error) {
// TODO this should NOT return `Instance` because that will create the illusion that all fields and functions are
// TODO available; if Shlomi agrees with this approach in general, I'll make a `ReplicaInstance` struct or something like that
replicas := make([]*Instance, 0)
if replicaDetails == "" {
return replicas, nil
}
detailsList := strings.Split(replicaDetails, " ")
for _, details := range detailsList {
replica := NewInstance()
replica.MasterKey = *masterKey
parts := strings.Split(details, ",")
if len(parts) != 8 {
return nil, log.Errorf("unable to parse replica details: %s", details)
}
instanceKey, err := ParseRawInstanceKey(parts[0])
if err != nil {
return nil, log.Errorf("unable to parse instance key: %s", err.Error())
}
replica.Key.Hostname = instanceKey.Hostname
replica.Key.Port = instanceKey.Port
replica.SemiSyncReplicaEnabled = parts[1] == "1"
priority, err := strconv.Atoi(parts[2])
if err != nil {
return nil, log.Errorf("unable to parse semi-sync priority: %s", err.Error())
}
replica.SemiSyncPriority = uint(priority)
replica.IsDowntimed = parts[3] == "1"
replica.IsLastCheckValid = parts[4] == "1"
replica.PromotionRule, err = ParseCandidatePromotionRule(parts[5])
if err != nil {
return nil, log.Errorf("unable to parse promotion rule: %s", err.Error())
}
replica.ReplicationIOThreadRuning = parts[6] == "1"
replica.ReplicationSQLThreadRuning = parts[7] == "1"
replicas = append(replicas, replica)
}
return replicas, nil
}

func getConcensusReplicationAnalysis(analysisEntries []ReplicationAnalysis) ([]ReplicationAnalysis, error) {
if !orcraft.IsRaftEnabled() {
return analysisEntries, nil
Expand Down
17 changes: 9 additions & 8 deletions go/inst/instance_topology_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,17 +645,18 @@ func MaybeEnableSemiSyncReplica(replicaInstance *Instance) (*Instance, error) {
}

// ClassifyAndPrioritizeReplicas takes a list of replica instances and classifies them based on their semi-sync priority, excluding
// replicas that are downtimed or down. It furthermore prioritizes the possible semi-sync replicas based on SemiSyncPriority, PromotionRule
// and hostname (fallback).
// replicas that are down or not replicating. Downtimed replicas are treated as always-async replicas. It furthermore prioritizes the
// possible semi-sync replicas based on SemiSyncPriority, PromotionRule and hostname (fallback).
func ClassifyAndPrioritizeReplicas(replicas []*Instance, includeNonReplicatingInstance *InstanceKey) (possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, excludedReplicas []*Instance) {
// Filter out downtimed and down replicas
// Classify replicas into excluded, async and possible semi-sync replicas
possibleSemiSyncReplicas = make([]*Instance, 0)
asyncReplicas = make([]*Instance, 0)
excludedReplicas = make([]*Instance, 0)
for _, replica := range replicas {
if replica.IsDowntimed || !replica.IsLastCheckValid || (!replica.Key.Equals(includeNonReplicatingInstance) && !replica.ReplicaRunning()) {
isReplicating := replica.ReplicationIOThreadRuning && replica.ReplicationSQLThreadRuning
if !replica.IsLastCheckValid || (!replica.Key.Equals(includeNonReplicatingInstance) && !isReplicating) {
excludedReplicas = append(excludedReplicas, replica)
} else if replica.SemiSyncPriority == 0 {
} else if replica.SemiSyncPriority == 0 || replica.IsDowntimed {
asyncReplicas = append(asyncReplicas, replica)
} else {
possibleSemiSyncReplicas = append(possibleSemiSyncReplicas, replica)
Expand Down Expand Up @@ -726,8 +727,8 @@ func determineSemiSyncReplicaActionsForEnoughTopology(possibleSemiSyncReplicas [
// LogSemiSyncReplicaAnalysis outputs the analysis results for a semi-sync analysis using the given log function
func LogSemiSyncReplicaAnalysis(possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, excludedReplicas []*Instance, actions map[*Instance]bool, logf func(s string, a ...interface{})) {
logReplicas("possible semi-sync replicas (in priority order)", possibleSemiSyncReplicas, logf)
logReplicas("always-async replicas", asyncReplicas, logf)
logReplicas("excluded replicas (downtimed/defunct)", excludedReplicas, logf)
logReplicas("always-async replicas (zero-priority or downtimed)", asyncReplicas, logf)
logReplicas("excluded replicas (last check failed or not replicating)", excludedReplicas, logf)
if len(actions) > 0 {
logf("semi-sync: suggested actions:")
for replica, enable := range actions {
Expand All @@ -742,7 +743,7 @@ func logReplicas(description string, replicas []*Instance, logf func(s string, a
if len(replicas) > 0 {
logf("semi-sync: %s:", description)
for _, replica := range replicas {
logf("semi-sync: - %s: semi-sync enabled = %t, priority = %d, promotion rule = %s, downtimed = %t, last check = %t, replicating = %t", replica.Key.String(), replica.SemiSyncReplicaEnabled, replica.SemiSyncPriority, replica.PromotionRule, replica.IsDowntimed, replica.IsLastCheckValid, replica.ReplicaRunning())
logf("semi-sync: - %s: semi-sync enabled = %t, priority = %d, promotion rule = %s, downtimed = %t, last check = %t, replicating = %t", replica.Key.String(), replica.SemiSyncReplicaEnabled, replica.SemiSyncPriority, replica.PromotionRule, replica.IsDowntimed, replica.IsLastCheckValid, replica.ReplicationIOThreadRuning && replica.ReplicationSQLThreadRuning)
}
} else {
logf("semi-sync: %s: (none)", description)
Expand Down
10 changes: 5 additions & 5 deletions go/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1497,11 +1497,11 @@ func checkAndRecoverLockedSemiSyncMaster(analysisEntry inst.ReplicationAnalysis,
return recoverSemiSyncReplicas(topologyRecovery, analysisEntry, false)
}

// checkAndRecoverMasterWithTooManySemiSyncReplicas
func checkAndRecoverMasterWithTooManySemiSyncReplicas(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
// checkAndRecoverMasterWithIncorrectSemiSyncReplicas
func checkAndRecoverMasterWithIncorrectSemiSyncReplicas(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, true, true)
if topologyRecovery == nil {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another RecoverMasterWithTooManySemiSyncReplicas.", analysisEntry.AnalyzedInstanceKey))
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another RecoverMasterWithIncorrectSemiSyncReplicas.", analysisEntry.AnalyzedInstanceKey))
return false, nil, err
}
return recoverSemiSyncReplicas(topologyRecovery, analysisEntry, true)
Expand Down Expand Up @@ -1735,8 +1735,8 @@ func getCheckAndRecoverFunction(analysisCode inst.AnalysisCode, analyzedInstance
} else {
return checkAndRecoverLockedSemiSyncMaster, true
}
case inst.MasterWithTooManySemiSyncReplicas:
return checkAndRecoverMasterWithTooManySemiSyncReplicas, true
case inst.MasterWithIncorrectSemiSyncReplicas:
return checkAndRecoverMasterWithIncorrectSemiSyncReplicas, true
// intermediate master
case inst.DeadIntermediateMaster:
return checkAndRecoverDeadIntermediateMaster, true
Expand Down