Skip to content
This repository was archived by the owner on Feb 18, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/inst/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
MasterSingleSlaveDead = "MasterSingleSlaveDead"
AllMasterSlavesNotReplicating = "AllMasterSlavesNotReplicating"
AllMasterSlavesNotReplicatingOrDead = "AllMasterSlavesNotReplicatingOrDead"
LockedSemiSyncMaster = "LockedSemiSyncMaster"
MasterWithoutSlaves = "MasterWithoutSlaves"
DeadCoMaster = "DeadCoMaster"
DeadCoMasterAndSomeSlaves = "DeadCoMasterAndSomeSlaves"
Expand Down Expand Up @@ -143,6 +144,9 @@ type ReplicationAnalysis struct {
MariaDBGTIDImmediateTopology bool
BinlogServerImmediateTopology bool
SemiSyncMasterEnabled bool
SemiSyncMasterStatus bool
SemiSyncMasterWaitForReplicaCount uint
SemiSyncMasterClients uint
CountSemiSyncReplicasEnabled uint
CountLoggingReplicas uint
CountStatementBasedLoggingReplicas uint
Expand Down
24 changes: 20 additions & 4 deletions go/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,21 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
MIN(
master_instance.supports_oracle_gtid
) AS supports_oracle_gtid,
MIN(
master_instance.slave_lag_seconds
) AS master_replication_lag_seconds,
MIN(
master_instance.semi_sync_master_enabled
) AS semi_sync_master_enabled,
MIN(
master_instance.semi_sync_master_wait_for_slave_count
) AS semi_sync_master_wait_for_slave_count,
MIN(
master_instance.semi_sync_master_clients
) AS semi_sync_master_clients,
MIN(
master_instance.semi_sync_master_status
) AS semi_sync_master_status,
SUM(
replica_instance.is_co_master
) AS count_co_master_replicas,
Expand Down Expand Up @@ -308,9 +317,12 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
a.BinlogServerImmediateTopology = countValidBinlogServerSlaves == a.CountValidReplicas && a.CountValidReplicas > 0
a.PseudoGTIDImmediateTopology = m.GetBool("is_pseudo_gtid")
a.SemiSyncMasterEnabled = m.GetBool("semi_sync_master_enabled")
a.SemiSyncMasterStatus = m.GetBool("semi_sync_master_status")
a.CountSemiSyncReplicasEnabled = m.GetUint("count_semi_sync_replicas")
countValidSemiSyncReplicasEnabled := m.GetUint("count_valid_semi_sync_replicas")
semiSyncMasterWaitForReplicaCount := m.GetUint("semi_sync_master_wait_for_slave_count")
// countValidSemiSyncReplicasEnabled := m.GetUint("count_valid_semi_sync_replicas")
a.SemiSyncMasterWaitForReplicaCount = m.GetUint("semi_sync_master_wait_for_slave_count")
a.SemiSyncMasterClients = m.GetUint("semi_sync_master_clients")
// masterReplicationLagSeconds := m.GetInt("master_replication_lag_seconds")

a.MinReplicaGTIDMode = m.GetString("min_replica_gtid_mode")
a.MaxReplicaGTIDMode = m.GetString("max_replica_gtid_mode")
Expand Down Expand Up @@ -359,6 +371,10 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
a.Analysis = UnreachableMaster
a.Description = "Master cannot be reached by orchestrator but it has replicating replicas; possibly a network/host issue"
//
} else if a.IsMaster && a.SemiSyncMasterEnabled && a.SemiSyncMasterStatus && a.SemiSyncMasterWaitForReplicaCount > 0 && a.SemiSyncMasterClients < a.SemiSyncMasterWaitForReplicaCount /*&& masterReplicationLagSeconds > config.Config.ReasonableReplicationLagSeconds */ {
a.Analysis = LockedSemiSyncMaster
a.Description = "Semi sync master is locked since it doesn't get enough replica acknowledgements"
//
} else if a.IsMaster && a.LastCheckValid && a.CountReplicas == 1 && a.CountValidReplicas == a.CountReplicas && a.CountValidReplicatingReplicas == 0 {
a.Analysis = MasterSingleSlaveNotReplicating
a.Description = "Master is reachable but its single slave is not replicating"
Expand Down Expand Up @@ -517,10 +533,10 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
a.StructureAnalysis = append(a.StructureAnalysis, NoWriteableMasterStructureWarning)
}

if a.IsMaster && a.SemiSyncMasterEnabled && countValidSemiSyncReplicasEnabled == 0 {
if a.IsMaster && a.SemiSyncMasterEnabled && a.SemiSyncMasterWaitForReplicaCount > 0 && a.SemiSyncMasterClients == 0 {
a.StructureAnalysis = append(a.StructureAnalysis, NoValidSemiSyncReplicasStructureWarning)
}
if a.IsMaster && a.SemiSyncMasterEnabled && countValidSemiSyncReplicasEnabled > 0 && countValidSemiSyncReplicasEnabled < semiSyncMasterWaitForReplicaCount {
if a.IsMaster && a.SemiSyncMasterEnabled && a.SemiSyncMasterWaitForReplicaCount > 0 && a.SemiSyncMasterClients > 0 && a.SemiSyncMasterClients < a.SemiSyncMasterWaitForReplicaCount {
a.StructureAnalysis = append(a.StructureAnalysis, NotEnoughValidSemiSyncReplicasStructureWarning)
}

Expand Down
107 changes: 90 additions & 17 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -2846,8 +2846,8 @@ func ReadHistoryClusterInstances(clusterName string, historyTimestampPattern str
}

// RecordInstanceCoordinatesHistory snapshots the binlog coordinates of instances
func RecordInstanceCoordinatesHistory() error {
{
func RecordInstanceCoordinatesHistory(instanceKey *InstanceKey) error {
if instanceKey == nil {
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
delete from database_instance_coordinates_history
Expand All @@ -2859,23 +2859,35 @@ func RecordInstanceCoordinatesHistory() error {
}
ExecDBWriteFunc(writeFunc)
}
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
insert into
database_instance_coordinates_history (
hostname, port, last_seen, recorded_timestamp,
binary_log_file, binary_log_pos, relay_log_file, relay_log_pos
)
select
hostname, port, last_seen, NOW(),
args := sqlutils.Args()
whereClause := ""
if instanceKey != nil {
whereClause = "and hostname=? and port=?"
args = append(args, instanceKey.Hostname, instanceKey.Port)
}
query := fmt.Sprintf(`
insert into
database_instance_coordinates_history (
hostname, port, last_seen, recorded_timestamp,
binary_log_file, binary_log_pos, relay_log_file, relay_log_pos
from
database_instance
where
)
select
hostname, port, last_seen, NOW(),
binary_log_file, binary_log_pos, relay_log_file, relay_log_pos
from
database_instance
where
(
binary_log_file != ''
OR relay_log_file != ''
`,
)
or relay_log_file != ''
)
%s
`,
whereClause,
)

writeFunc := func() error {
_, err := db.ExecOrchestrator(query, args...)
return log.Errore(err)
}
return ExecDBWriteFunc(writeFunc)
Expand Down Expand Up @@ -2905,6 +2917,67 @@ func GetHeuristiclyRecentCoordinatesForInstance(instanceKey *InstanceKey) (selfC
return selfCoordinates, relayLogCoordinates, err
}

// GetRecentCoordinatesSummaryForInstance returns the amount of recorded coordinates
// in given past seconds, and the amount of unique coordinates during that time.
func AreInstanceBinlogCoordinatesStaleInPastSeconds(instanceKey *InstanceKey, seconds int64, minExpectedSamples int64) (stale bool, err error) {
coordinatesMap := make(map[string]int64)
recentEntryMet := false
oldEntryMet := false
query := `
select
concat(binary_log_file, ':', binary_log_pos) as binlog_coordinates,
recorded_timestamp >= NOW() - INTERVAL ? SECOND as recent_entry_met,
recorded_timestamp < NOW() - INTERVAL ? SECOND as old_entry_met
from
database_instance_coordinates_history
where
hostname = ?
and port = ?
order by
history_id desc
`
err = db.QueryOrchestrator(query,
sqlutils.Args(seconds, seconds, instanceKey.Hostname, instanceKey.Port),
func(m sqlutils.RowMap) error {
if oldEntryMet {
// we're done. Stop collecting data.
return nil
}
coordinates := m.GetString("binlog_coordinates")
coordinatesMap[coordinates] = coordinatesMap[coordinates] + 1
if m.GetBool("recent_entry_met") {
recentEntryMet = true
}
if m.GetBool("old_entry_met") {
oldEntryMet = true
}
return nil
})
if err != nil {
return false, err
}
if !recentEntryMet {
// Not recent information. Irrelevant
return false, nil
}
if !oldEntryMet {
// Not enough history
return false, nil
}
if len(coordinatesMap) != 1 {
// either 0 (==no data) or >1 (==multiple different coordinates, so not stale)
return false, nil
}
for _, count := range coordinatesMap {
// We actually know coordinatesMap has 1 single row, iteration is over that row only
if count < minExpectedSamples {
// Not enough samples within time range
return false, nil
}
}
return true, nil
}

// GetPreviousKnownRelayLogCoordinatesForInstance returns known relay log coordinates, that are not the
// exact current coordinates
func GetPreviousKnownRelayLogCoordinatesForInstance(instance *Instance) (relayLogCoordinates *BinlogCoordinates, err error) {
Expand Down
2 changes: 1 addition & 1 deletion go/logic/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ func ContinuousDiscovery() {
// Various periodic internal maintenance tasks
go func() {
if IsLeaderOrActive() {
go inst.RecordInstanceCoordinatesHistory()
go inst.RecordInstanceCoordinatesHistory(nil)
go inst.ReviewUnseenInstances()
go inst.InjectUnseenMasters()

Expand Down
22 changes: 22 additions & 0 deletions go/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,15 @@ func checkAndRecoverDeadCoMaster(analysisEntry inst.ReplicationAnalysis, candida
return true, topologyRecovery, err
}

// checkAndRecoverGenericProblem is a general-purpose recovery function
func checkAndRecoverLockedSemiSyncMaster(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (bool, *TopologyRecovery, error) {
coordinatesAreStale, err := inst.AreInstanceBinlogCoordinatesStaleInPastSeconds(&analysisEntry.AnalyzedInstanceKey, int64(config.Config.ReasonableReplicationLagSeconds), 3)
if coordinatesAreStale {
log.Debugf(" =================BOOM, %+v ", err)
}
return false, nil, nil
}

// checkAndRecoverGenericProblem is a general-purpose recovery function
func checkAndRecoverGenericProblem(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (bool, *TopologyRecovery, error) {
return false, nil, nil
Expand Down Expand Up @@ -1469,6 +1478,11 @@ func emergentlyRestartReplicationOnTopologyInstanceReplicas(instanceKey *inst.In
}
}

func emergentlyRecordBinlogCoordinates(instanceKey *inst.InstanceKey) {
err := inst.RecordInstanceCoordinatesHistory(instanceKey)
log.Errore(err)
}

// checkAndExecuteFailureDetectionProcesses tries to register for failure detection and potentially executes
// failure-detection processes.
func checkAndExecuteFailureDetectionProcesses(analysisEntry inst.ReplicationAnalysis, skipProcesses bool) (detectionRegistrationSuccess bool, processesExecutionAttempted bool, err error) {
Expand Down Expand Up @@ -1499,6 +1513,12 @@ func getCheckAndRecoverFunction(analysisCode inst.AnalysisCode, analyzedInstance
} else {
return checkAndRecoverDeadMaster, true
}
case inst.LockedSemiSyncMaster:
if isInEmergencyOperationGracefulPeriod(analyzedInstanceKey) {
return checkAndRecoverGenericProblem, false
} else {
return checkAndRecoverLockedSemiSyncMaster, true
}
// intermediate master
case inst.DeadIntermediateMaster:
return checkAndRecoverDeadIntermediateMaster, true
Expand Down Expand Up @@ -1546,6 +1566,8 @@ func runEmergentOperations(analysisEntry *inst.ReplicationAnalysis) {
go emergentlyReadTopologyInstanceReplicas(&analysisEntry.AnalyzedInstanceKey, analysisEntry.Analysis)
case inst.UnreachableMasterWithLaggingReplicas:
go emergentlyRestartReplicationOnTopologyInstanceReplicas(&analysisEntry.AnalyzedInstanceKey, analysisEntry.Analysis)
case inst.LockedSemiSyncMaster:
go emergentlyRecordBinlogCoordinates(&analysisEntry.AnalyzedInstanceKey)
case inst.UnreachableIntermediateMasterWithLaggingReplicas:
go emergentlyRestartReplicationOnTopologyInstanceReplicas(&analysisEntry.AnalyzedInstanceKey, analysisEntry.Analysis)
case inst.AllMasterSlavesNotReplicating:
Expand Down
1 change: 1 addition & 0 deletions resources/public/js/cluster-analysis-shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ var interestingAnalysis = {
"UnreachableMasterWithStaleSlaves": true,
"UnreachableMasterWithLaggingReplicas": true,
"UnreachableMaster" : true,
"LockedSemiSyncMaster" : true,
"AllMasterSlavesNotReplicating" : true,
"AllMasterSlavesNotReplicatingOrDead" : true,
"AllMasterSlavesStale" : true,
Expand Down