From 7ecc525a9665431029535a402e86cbf42c616b17 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Mon, 26 Aug 2019 10:21:21 +0300 Subject: [PATCH 1/3] Recovery processes ending with "&" are executed asynchronously --- docs/configuration-recovery.md | 2 + go/inst/instance_topology.go | 2 +- go/logic/topology_recovery.go | 70 +++++++++++++++++++--------------- 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/docs/configuration-recovery.md b/docs/configuration-recovery.md index 4935a133b..83ac6b3db 100644 --- a/docs/configuration-recovery.md +++ b/docs/configuration-recovery.md @@ -66,6 +66,8 @@ These hooks are available for recoveries: - `PostUnsuccessfulFailoverProcesses`: executed at the end of any unsuccessful recovery. - `PostGracefulTakeoverProcesses`: executed on planned, graceful master takeover, after the old master is positioned under the newly promoted master. +Any process command that ends with `"&"` will be executed asynchronously, and a failure for such process is ignored. + All of the above are lists of commands which `orchestrator` executes sequentially, in order of definition. A naive implementation might look like: diff --git a/go/inst/instance_topology.go b/go/inst/instance_topology.go index 6e9ad684c..843b8192f 100644 --- a/go/inst/instance_topology.go +++ b/go/inst/instance_topology.go @@ -1857,7 +1857,7 @@ func sortedReplicas(replicas [](*Instance), stopReplicationMethod StopReplicatio // This function assumes given `replicas` argument is indeed a list of instances all replicating // from the same master (the result of `getReplicasForSorting()` is appropriate) func sortedReplicasDataCenterHint(replicas [](*Instance), stopReplicationMethod StopReplicationMethod, dataCenterHint string) [](*Instance) { - if len(replicas) == 0 { + if len(replicas) <= 1 { return replicas } replicas = StopSlaves(replicas, stopReplicationMethod, time.Duration(config.Config.InstanceBulkOperationsWaitTimeoutSeconds)*time.Second) diff --git a/go/logic/topology_recovery.go b/go/logic/topology_recovery.go index e07a31b2c..e13416359 100644 --- a/go/logic/topology_recovery.go +++ b/go/logic/topology_recovery.go @@ -252,9 +252,14 @@ func resolveRecovery(topologyRecovery *TopologyRecovery, successorInstance *inst } } -// replaceCommandPlaceholders replaces agreed-upon placeholders with analysis data -func replaceCommandPlaceholders(command string, topologyRecovery *TopologyRecovery) string { +// prepareCommand replaces agreed-upon placeholders with analysis data +func prepareCommand(command string, topologyRecovery *TopologyRecovery) (result string, async bool) { analysisEntry := &topologyRecovery.AnalysisEntry + command = strings.TrimSpace(command) + if strings.HasSuffix(command, "&") { + command = strings.TrimRight(command, "&") + async = true + } command = strings.Replace(command, "{failureType}", string(analysisEntry.Analysis), -1) command = strings.Replace(command, "{failureDescription}", analysisEntry.Description, -1) command = strings.Replace(command, "{command}", analysisEntry.CommandHint, -1) @@ -286,7 +291,7 @@ func replaceCommandPlaceholders(command string, topologyRecovery *TopologyRecove command = strings.Replace(command, "{slaveHosts}", analysisEntry.SlaveHosts.ToCommaDelimitedList(), -1) command = strings.Replace(command, "{replicaHosts}", analysisEntry.SlaveHosts.ToCommaDelimitedList(), -1) - return command + return command, async } // applyEnvironmentVariables sets the relevant environment variables for a recovery @@ -322,50 +327,52 @@ func applyEnvironmentVariables(topologyRecovery *TopologyRecovery) []string { return env } +func executeProcess(command string, env []string, topologyRecovery *TopologyRecovery, fullDescription string) (err error) { + // Log the command to be run and record how long it takes as this may be useful + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Running %s: %s", fullDescription, command)) + start := time.Now() + var info string + if err = os.CommandRun(command, env); err == nil { + info = fmt.Sprintf("Completed %s in %v", fullDescription, time.Since(start)) + } else { + info = fmt.Sprintf("Execution of %s failed in %v with error: %v", fullDescription, time.Since(start), err) + log.Errorf(info) + } + AuditTopologyRecovery(topologyRecovery, info) + return err +} + // executeProcesses executes a list of processes -func executeProcesses(processes []string, description string, topologyRecovery *TopologyRecovery, failOnError bool) error { +func executeProcesses(processes []string, description string, topologyRecovery *TopologyRecovery, failOnError bool) (err error) { if len(processes) == 0 { AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("No %s hooks to run", description)) return nil } - var err error AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Running %d %s hooks", len(processes), description)) for i, command := range processes { fullDescription := fmt.Sprintf("%s hook %d of %d", description, i+1, len(processes)) - command := replaceCommandPlaceholders(command, topologyRecovery) + command, async := prepareCommand(command, topologyRecovery) env := applyEnvironmentVariables(topologyRecovery) - // Log the command to be run and record how long it takes as this may be useful - AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Running %s: %s", fullDescription, command)) - start := time.Now() - if cmdErr := os.CommandRun(command, env); cmdErr == nil { - info := fmt.Sprintf("Completed %s in %v", - fullDescription, time.Since(start)) - AuditTopologyRecovery(topologyRecovery, info) + if async { + // Ignore errors + go executeProcess(command, env, topologyRecovery, fullDescription) } else { - info := fmt.Sprintf("Execution of %s failed in %v with error: %v", - fullDescription, time.Since(start), cmdErr) - AuditTopologyRecovery(topologyRecovery, info) - log.Errorf(info) - // FIXME: It would be good to additionally include command execution output to the auditing - - if err == nil { - // Note first error - err = cmdErr - } - if failOnError { - AuditTopologyRecovery( - topologyRecovery, - fmt.Sprintf("Not running further %s hooks", description)) - return err + if cmdErr := executeProcess(command, env, topologyRecovery, fullDescription); cmdErr != nil { + if failOnError { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Not running further %s hooks", description)) + return cmdErr + } + if err == nil { + // Keep first error encountered + err = cmdErr + } } } } - AuditTopologyRecovery( - topologyRecovery, - fmt.Sprintf("done running %s hooks", description)) + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("done running %s hooks", description)) return err } @@ -1873,6 +1880,7 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey) return nil, nil, err } preGracefulTakeoverTopologyRecovery := &TopologyRecovery{ + SuccessorKey: &designatedInstance.Key, AnalysisEntry: analysisEntry, } if err := executeProcesses(config.Config.PreGracefulTakeoverProcesses, "PreGracefulTakeoverProcesses", preGracefulTakeoverTopologyRecovery, true); err != nil { From 17329eb3d58bb5e9efabd3068544bd08aa86e10a Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Mon, 26 Aug 2019 10:24:25 +0300 Subject: [PATCH 2/3] PreGracefulTakeoverProcesses get {successorHost}, {successorPort} information --- go/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/config/config.go b/go/config/config.go index dae3b3b4a..3c1ad4c80 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -227,7 +227,7 @@ type Configuration struct { RecoverIntermediateMasterClusterFilters []string // Only do IM recovery on clusters matching these regexp patterns (of course the ".*" pattern matches everything) ProcessesShellCommand string // Shell that executes command scripts OnFailureDetectionProcesses []string // Processes to execute when detecting a failover scenario (before making a decision whether to failover or not). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {successorAlias}, {countReplicas}, {replicaHosts}, {isDowntimed}, {autoMasterRecovery}, {autoIntermediateMasterRecovery} - PreGracefulTakeoverProcesses []string // Processes to execute before doing a failover (aborting operation should any once of them exits with non-zero code; order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {countReplicas}, {replicaHosts}, {isDowntimed} + PreGracefulTakeoverProcesses []string // Processes to execute before doing a failover (aborting operation should any once of them exits with non-zero code; order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {countReplicas}, {replicaHosts}, {isDowntimed} PreFailoverProcesses []string // Processes to execute before doing a failover (aborting operation should any once of them exits with non-zero code; order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {countReplicas}, {replicaHosts}, {isDowntimed} PostFailoverProcesses []string // Processes to execute after doing a failover (order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {successorAlias}, {countReplicas}, {replicaHosts}, {isDowntimed}, {isSuccessful}, {lostReplicas}, {countLostReplicas} PostUnsuccessfulFailoverProcesses []string // Processes to execute after a not-completely-successful failover (order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {successorAlias}, {countReplicas}, {replicaHosts}, {isDowntimed}, {isSuccessful}, {lostReplicas}, {countLostReplicas} From 479fbfb18344bdf7ecdf6135549106918abcb006 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Mon, 26 Aug 2019 13:18:33 +0300 Subject: [PATCH 3/3] indicate async in description --- go/logic/topology_recovery.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/go/logic/topology_recovery.go b/go/logic/topology_recovery.go index e13416359..50a9b2a52 100644 --- a/go/logic/topology_recovery.go +++ b/go/logic/topology_recovery.go @@ -351,11 +351,13 @@ func executeProcesses(processes []string, description string, topologyRecovery * AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Running %d %s hooks", len(processes), description)) for i, command := range processes { - fullDescription := fmt.Sprintf("%s hook %d of %d", description, i+1, len(processes)) - command, async := prepareCommand(command, topologyRecovery) env := applyEnvironmentVariables(topologyRecovery) + fullDescription := fmt.Sprintf("%s hook %d of %d", description, i+1, len(processes)) + if async { + fullDescription = fmt.Sprintf("%s (async)", fullDescription) + } if async { // Ignore errors go executeProcess(command, env, topologyRecovery, fullDescription)