Skip to content
This repository was archived by the owner on Feb 18, 2025. It is now read-only.

Recovery processes ending with "&" are executed asynchronously #968

Merged
merged 3 commits into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/configuration-recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ These hooks are available for recoveries:
- `PostUnsuccessfulFailoverProcesses`: executed at the end of any unsuccessful recovery.
- `PostGracefulTakeoverProcesses`: executed on planned, graceful master takeover, after the old master is positioned under the newly promoted master.

Any process command that ends with `"&"` will be executed asynchronously, and a failure for such process is ignored.

All of the above are lists of commands which `orchestrator` executes sequentially, in order of definition.

A naive implementation might look like:
Expand Down
2 changes: 1 addition & 1 deletion go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ type Configuration struct {
RecoverIntermediateMasterClusterFilters []string // Only do IM recovery on clusters matching these regexp patterns (of course the ".*" pattern matches everything)
ProcessesShellCommand string // Shell that executes command scripts
OnFailureDetectionProcesses []string // Processes to execute when detecting a failover scenario (before making a decision whether to failover or not). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {successorAlias}, {countReplicas}, {replicaHosts}, {isDowntimed}, {autoMasterRecovery}, {autoIntermediateMasterRecovery}
PreGracefulTakeoverProcesses []string // Processes to execute before doing a failover (aborting operation should any once of them exits with non-zero code; order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {countReplicas}, {replicaHosts}, {isDowntimed}
PreGracefulTakeoverProcesses []string // Processes to execute before doing a failover (aborting operation should any once of them exits with non-zero code; order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {countReplicas}, {replicaHosts}, {isDowntimed}
PreFailoverProcesses []string // Processes to execute before doing a failover (aborting operation should any once of them exits with non-zero code; order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {countReplicas}, {replicaHosts}, {isDowntimed}
PostFailoverProcesses []string // Processes to execute after doing a failover (order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {successorAlias}, {countReplicas}, {replicaHosts}, {isDowntimed}, {isSuccessful}, {lostReplicas}, {countLostReplicas}
PostUnsuccessfulFailoverProcesses []string // Processes to execute after a not-completely-successful failover (order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {successorAlias}, {countReplicas}, {replicaHosts}, {isDowntimed}, {isSuccessful}, {lostReplicas}, {countLostReplicas}
Expand Down
2 changes: 1 addition & 1 deletion go/inst/instance_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -1857,7 +1857,7 @@ func sortedReplicas(replicas [](*Instance), stopReplicationMethod StopReplicatio
// This function assumes given `replicas` argument is indeed a list of instances all replicating
// from the same master (the result of `getReplicasForSorting()` is appropriate)
func sortedReplicasDataCenterHint(replicas [](*Instance), stopReplicationMethod StopReplicationMethod, dataCenterHint string) [](*Instance) {
if len(replicas) == 0 {
if len(replicas) <= 1 {
return replicas
}
replicas = StopSlaves(replicas, stopReplicationMethod, time.Duration(config.Config.InstanceBulkOperationsWaitTimeoutSeconds)*time.Second)
Expand Down
76 changes: 43 additions & 33 deletions go/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,14 @@ func resolveRecovery(topologyRecovery *TopologyRecovery, successorInstance *inst
}
}

// replaceCommandPlaceholders replaces agreed-upon placeholders with analysis data
func replaceCommandPlaceholders(command string, topologyRecovery *TopologyRecovery) string {
// prepareCommand replaces agreed-upon placeholders with analysis data
func prepareCommand(command string, topologyRecovery *TopologyRecovery) (result string, async bool) {
analysisEntry := &topologyRecovery.AnalysisEntry
command = strings.TrimSpace(command)
if strings.HasSuffix(command, "&") {
command = strings.TrimRight(command, "&")
async = true
}
command = strings.Replace(command, "{failureType}", string(analysisEntry.Analysis), -1)
command = strings.Replace(command, "{failureDescription}", analysisEntry.Description, -1)
command = strings.Replace(command, "{command}", analysisEntry.CommandHint, -1)
Expand Down Expand Up @@ -286,7 +291,7 @@ func replaceCommandPlaceholders(command string, topologyRecovery *TopologyRecove
command = strings.Replace(command, "{slaveHosts}", analysisEntry.SlaveHosts.ToCommaDelimitedList(), -1)
command = strings.Replace(command, "{replicaHosts}", analysisEntry.SlaveHosts.ToCommaDelimitedList(), -1)

return command
return command, async
}

// applyEnvironmentVariables sets the relevant environment variables for a recovery
Expand Down Expand Up @@ -322,50 +327,54 @@ func applyEnvironmentVariables(topologyRecovery *TopologyRecovery) []string {
return env
}

func executeProcess(command string, env []string, topologyRecovery *TopologyRecovery, fullDescription string) (err error) {
// Log the command to be run and record how long it takes as this may be useful
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Running %s: %s", fullDescription, command))
start := time.Now()
var info string
if err = os.CommandRun(command, env); err == nil {
info = fmt.Sprintf("Completed %s in %v", fullDescription, time.Since(start))
} else {
info = fmt.Sprintf("Execution of %s failed in %v with error: %v", fullDescription, time.Since(start), err)
log.Errorf(info)
}
AuditTopologyRecovery(topologyRecovery, info)
return err
}

// executeProcesses executes a list of processes
func executeProcesses(processes []string, description string, topologyRecovery *TopologyRecovery, failOnError bool) error {
func executeProcesses(processes []string, description string, topologyRecovery *TopologyRecovery, failOnError bool) (err error) {
if len(processes) == 0 {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("No %s hooks to run", description))
return nil
}

var err error
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Running %d %s hooks", len(processes), description))
for i, command := range processes {
fullDescription := fmt.Sprintf("%s hook %d of %d", description, i+1, len(processes))

command := replaceCommandPlaceholders(command, topologyRecovery)
command, async := prepareCommand(command, topologyRecovery)
env := applyEnvironmentVariables(topologyRecovery)

// Log the command to be run and record how long it takes as this may be useful
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Running %s: %s", fullDescription, command))
start := time.Now()
if cmdErr := os.CommandRun(command, env); cmdErr == nil {
info := fmt.Sprintf("Completed %s in %v",
fullDescription, time.Since(start))
AuditTopologyRecovery(topologyRecovery, info)
fullDescription := fmt.Sprintf("%s hook %d of %d", description, i+1, len(processes))
if async {
fullDescription = fmt.Sprintf("%s (async)", fullDescription)
}
if async {
// Ignore errors
go executeProcess(command, env, topologyRecovery, fullDescription)
} else {
info := fmt.Sprintf("Execution of %s failed in %v with error: %v",
fullDescription, time.Since(start), cmdErr)
AuditTopologyRecovery(topologyRecovery, info)
log.Errorf(info)
// FIXME: It would be good to additionally include command execution output to the auditing

if err == nil {
// Note first error
err = cmdErr
}
if failOnError {
AuditTopologyRecovery(
topologyRecovery,
fmt.Sprintf("Not running further %s hooks", description))
return err
if cmdErr := executeProcess(command, env, topologyRecovery, fullDescription); cmdErr != nil {
if failOnError {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Not running further %s hooks", description))
return cmdErr
}
if err == nil {
// Keep first error encountered
err = cmdErr
}
}
}
}
AuditTopologyRecovery(
topologyRecovery,
fmt.Sprintf("done running %s hooks", description))
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("done running %s hooks", description))
return err
}

Expand Down Expand Up @@ -1873,6 +1882,7 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey)
return nil, nil, err
}
preGracefulTakeoverTopologyRecovery := &TopologyRecovery{
SuccessorKey: &designatedInstance.Key,
AnalysisEntry: analysisEntry,
}
if err := executeProcesses(config.Config.PreGracefulTakeoverProcesses, "PreGracefulTakeoverProcesses", preGracefulTakeoverTopologyRecovery, true); err != nil {
Expand Down