Skip to content
This repository was archived by the owner on Feb 18, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions go/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
case registerCliCommand("stop-slave", "Replication, general", `Issue a STOP SLAVE on an instance`):
{
instanceKey, _ = inst.FigureInstanceKey(instanceKey, thisInstanceKey)
_, err := inst.StopSlave(instanceKey)
_, err := inst.StopReplication(instanceKey)
if err != nil {
log.Fatale(err)
}
Expand All @@ -618,7 +618,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
case registerCliCommand("start-slave", "Replication, general", `Issue a START SLAVE on an instance`):
{
instanceKey, _ = inst.FigureInstanceKey(instanceKey, thisInstanceKey)
_, err := inst.StartSlave(instanceKey)
_, err := inst.StartReplication(instanceKey)
if err != nil {
log.Fatale(err)
}
Expand All @@ -627,7 +627,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
case registerCliCommand("restart-slave", "Replication, general", `STOP and START SLAVE on an instance`):
{
instanceKey, _ = inst.FigureInstanceKey(instanceKey, thisInstanceKey)
_, err := inst.RestartSlave(instanceKey)
_, err := inst.RestartReplication(instanceKey)
if err != nil {
log.Fatale(err)
}
Expand All @@ -636,7 +636,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
case registerCliCommand("reset-slave", "Replication, general", `Issues a RESET SLAVE command; use with care`):
{
instanceKey, _ = inst.FigureInstanceKey(instanceKey, thisInstanceKey)
_, err := inst.ResetSlaveOperation(instanceKey)
_, err := inst.ResetReplicationOperation(instanceKey)
if err != nil {
log.Fatale(err)
}
Expand Down Expand Up @@ -732,7 +732,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
if instanceKey == nil {
log.Fatalf("Unresolved instance")
}
statements, err := inst.GetSlaveRestartPreserveStatements(instanceKey, *config.RuntimeCLIFlags.Statement)
statements, err := inst.GetReplicationRestartPreserveStatements(instanceKey, *config.RuntimeCLIFlags.Statement)
if err != nil {
log.Fatale(err)
}
Expand Down
48 changes: 24 additions & 24 deletions go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,8 @@ func (this *HttpAPI) MakeCoMaster(params martini.Params, r render.Render, req *h
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Instance made co-master: %+v", instance.Key), Details: instance})
}

// ResetSlave makes a replica forget about its master, effectively breaking the replication
func (this *HttpAPI) ResetSlave(params martini.Params, r render.Render, req *http.Request, user auth.User) {
// ResetReplication makes a replica forget about its master, effectively breaking the replication
func (this *HttpAPI) ResetReplication(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
Expand All @@ -615,7 +615,7 @@ func (this *HttpAPI) ResetSlave(params martini.Params, r render.Render, req *htt
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
instance, err := inst.ResetSlaveOperation(&instanceKey)
instance, err := inst.ResetReplicationOperation(&instanceKey)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
Expand Down Expand Up @@ -1256,8 +1256,8 @@ func (this *HttpAPI) SkipQuery(params martini.Params, r render.Render, req *http
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Query skipped on %+v", instance.Key), Details: instance})
}

// StartSlave starts replication on given instance
func (this *HttpAPI) StartSlave(params martini.Params, r render.Render, req *http.Request, user auth.User) {
// StartReplication starts replication on given instance
func (this *HttpAPI) StartReplication(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
Expand All @@ -1268,7 +1268,7 @@ func (this *HttpAPI) StartSlave(params martini.Params, r render.Render, req *htt
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
instance, err := inst.StartSlave(&instanceKey)
instance, err := inst.StartReplication(&instanceKey)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
Expand All @@ -1277,8 +1277,8 @@ func (this *HttpAPI) StartSlave(params martini.Params, r render.Render, req *htt
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Replica started: %+v", instance.Key), Details: instance})
}

// RestartSlave stops & starts replication on given instance
func (this *HttpAPI) RestartSlave(params martini.Params, r render.Render, req *http.Request, user auth.User) {
// RestartReplication stops & starts replication on given instance
func (this *HttpAPI) RestartReplication(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
Expand All @@ -1289,7 +1289,7 @@ func (this *HttpAPI) RestartSlave(params martini.Params, r render.Render, req *h
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
instance, err := inst.RestartSlave(&instanceKey)
instance, err := inst.RestartReplication(&instanceKey)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
Expand All @@ -1298,8 +1298,8 @@ func (this *HttpAPI) RestartSlave(params martini.Params, r render.Render, req *h
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Replica restarted: %+v", instance.Key), Details: instance})
}

// StopSlave stops replication on given instance
func (this *HttpAPI) StopSlave(params martini.Params, r render.Render, req *http.Request, user auth.User) {
// StopReplication stops replication on given instance
func (this *HttpAPI) StopReplication(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
Expand All @@ -1310,7 +1310,7 @@ func (this *HttpAPI) StopSlave(params martini.Params, r render.Render, req *http
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
instance, err := inst.StopSlave(&instanceKey)
instance, err := inst.StopReplication(&instanceKey)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
Expand All @@ -1319,8 +1319,8 @@ func (this *HttpAPI) StopSlave(params martini.Params, r render.Render, req *http
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Replica stopped: %+v", instance.Key), Details: instance})
}

// StopSlaveNicely stops replication on given instance, such that sql thead is aligned with IO thread
func (this *HttpAPI) StopSlaveNicely(params martini.Params, r render.Render, req *http.Request, user auth.User) {
// StopReplicationNicely stops replication on given instance, such that sql thead is aligned with IO thread
func (this *HttpAPI) StopReplicationNicely(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
Expand All @@ -1331,7 +1331,7 @@ func (this *HttpAPI) StopSlaveNicely(params martini.Params, r render.Render, req
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
instance, err := inst.StopSlaveNicely(&instanceKey, 0)
instance, err := inst.StopReplicationNicely(&instanceKey, 0)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
Expand Down Expand Up @@ -1393,10 +1393,10 @@ func (this *HttpAPI) PurgeBinaryLogs(params martini.Params, r render.Render, req
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Binary logs flushed on: %+v", instance.Key), Details: instance})
}

// RestartSlaveStatements receives a query to execute that requires a replication restart to apply.
// RestartReplicationStatements receives a query to execute that requires a replication restart to apply.
// As an example, this may be `set global rpl_semi_sync_slave_enabled=1`. orchestrator will check
// replication status on given host and will wrap with appropriate stop/start statements, if need be.
func (this *HttpAPI) RestartSlaveStatements(params martini.Params, r render.Render, req *http.Request, user auth.User) {
func (this *HttpAPI) RestartReplicationStatements(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
Expand All @@ -1409,7 +1409,7 @@ func (this *HttpAPI) RestartSlaveStatements(params martini.Params, r render.Rend
}

query := req.URL.Query().Get("q")
statements, err := inst.GetSlaveRestartPreserveStatements(&instanceKey, query)
statements, err := inst.GetReplicationRestartPreserveStatements(&instanceKey, query)

if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
Expand Down Expand Up @@ -3634,18 +3634,18 @@ func (this *HttpAPI) RegisterRequests(m *martini.ClassicMartini) {
this.registerAPIRequest(m, "gtid-errant-reset-master/:host/:port", this.ErrantGTIDResetMaster)
this.registerAPIRequest(m, "gtid-errant-inject-empty/:host/:port", this.ErrantGTIDInjectEmpty)
this.registerAPIRequest(m, "skip-query/:host/:port", this.SkipQuery)
this.registerAPIRequest(m, "start-slave/:host/:port", this.StartSlave)
this.registerAPIRequest(m, "restart-slave/:host/:port", this.RestartSlave)
this.registerAPIRequest(m, "stop-slave/:host/:port", this.StopSlave)
this.registerAPIRequest(m, "stop-slave-nice/:host/:port", this.StopSlaveNicely)
this.registerAPIRequest(m, "reset-slave/:host/:port", this.ResetSlave)
this.registerAPIRequest(m, "start-slave/:host/:port", this.StartReplication)
this.registerAPIRequest(m, "restart-slave/:host/:port", this.RestartReplication)
this.registerAPIRequest(m, "stop-slave/:host/:port", this.StopReplication)
this.registerAPIRequest(m, "stop-slave-nice/:host/:port", this.StopReplicationNicely)
this.registerAPIRequest(m, "reset-slave/:host/:port", this.ResetReplication)
this.registerAPIRequest(m, "detach-slave/:host/:port", this.DetachReplicaMasterHost)
this.registerAPIRequest(m, "reattach-slave/:host/:port", this.ReattachReplicaMasterHost)
this.registerAPIRequest(m, "detach-slave-master-host/:host/:port", this.DetachReplicaMasterHost)
this.registerAPIRequest(m, "reattach-slave-master-host/:host/:port", this.ReattachReplicaMasterHost)
this.registerAPIRequest(m, "flush-binary-logs/:host/:port", this.FlushBinaryLogs)
this.registerAPIRequest(m, "purge-binary-logs/:host/:port/:logFile", this.PurgeBinaryLogs)
this.registerAPIRequest(m, "restart-slave-statements/:host/:port", this.RestartSlaveStatements)
this.registerAPIRequest(m, "restart-slave-statements/:host/:port", this.RestartReplicationStatements)
this.registerAPIRequest(m, "enable-semi-sync-master/:host/:port", this.EnableSemiSyncMaster)
this.registerAPIRequest(m, "disable-semi-sync-master/:host/:port", this.DisableSemiSyncMaster)
this.registerAPIRequest(m, "enable-semi-sync-replica/:host/:port", this.EnableSemiSyncReplica)
Expand Down
19 changes: 16 additions & 3 deletions go/inst/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package inst

import (
"encoding/json"
"fmt"
"strings"

Expand Down Expand Up @@ -130,7 +131,8 @@ type ReplicationAnalysis struct {
CountReplicasFailingToConnectToMaster uint
CountDowntimedReplicas uint
ReplicationDepth uint
SlaveHosts InstanceKeyMap
Replicas InstanceKeyMap
SlaveHosts InstanceKeyMap // for backwards compatibility. Equals `Replicas`
IsFailingToConnectToMaster bool
Analysis AnalysisCode
Description string
Expand Down Expand Up @@ -177,10 +179,21 @@ type ReplicationAnalysisChangelog struct {
Changelog []string
}

func (this *ReplicationAnalysis) MarshalJSON() ([]byte, error) {
i := struct {
ReplicationAnalysis
}{}
i.ReplicationAnalysis = *this
// backwards compatibility
i.SlaveHosts = i.Replicas

return json.Marshal(i)
}

// ReadReplicaHostsFromString parses and reads replica keys from comma delimited string
func (this *ReplicationAnalysis) ReadReplicaHostsFromString(replicaHostsString string) error {
this.SlaveHosts = *NewInstanceKeyMap()
return this.SlaveHosts.ReadCommaDelimitedList(replicaHostsString)
this.Replicas = *NewInstanceKeyMap()
return this.Replicas.ReadCommaDelimitedList(replicaHostsString)
}

// AnalysisString returns a human friendly description of all analysis issues
Expand Down
4 changes: 2 additions & 2 deletions go/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,8 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
a.IsBinlogServer = m.GetBool("is_binlog_server")
a.ClusterDetails.ReadRecoveryInfo()

a.SlaveHosts = *NewInstanceKeyMap()
a.SlaveHosts.ReadCommaDelimitedList(m.GetString("slave_hosts"))
a.Replicas = *NewInstanceKeyMap()
a.Replicas.ReadCommaDelimitedList(m.GetString("slave_hosts"))

countValidOracleGTIDReplicas := m.GetUint("count_valid_oracle_gtid_replicas")
a.OracleGTIDImmediateTopology = countValidOracleGTIDReplicas == a.CountValidReplicas && a.CountValidReplicas > 0
Expand Down
43 changes: 22 additions & 21 deletions go/inst/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ type Instance struct {
Binlog_format string
BinlogRowImage string
LogBinEnabled bool
LogSlaveUpdatesEnabled bool
LogSlaveUpdatesEnabled bool // for API backwards compatibility. Equals `LogReplicationUpdatesEnabled`
LogReplicationUpdatesEnabled bool
SelfBinlogCoordinates BinlogCoordinates
MasterKey InstanceKey
MasterUUID string
AncestryUUID string
IsDetachedMaster bool

Slave_SQL_Running bool
Slave_SQL_Running bool // for API backwards compatibility. Equals `ReplicationSQLThreadRuning`
ReplicationSQLThreadRuning bool
Slave_IO_Running bool
Slave_IO_Running bool // for API backwards compatibility. Equals `ReplicationIOThreadRuning`
ReplicationIOThreadRuning bool
ReplicationSQLThreadState ReplicationThreadState
ReplicationIOThreadState ReplicationThreadState
Expand All @@ -80,9 +80,9 @@ type Instance struct {

masterExecutedGtidSet string // Not exported

SlaveLagSeconds sql.NullInt64
SlaveLagSeconds sql.NullInt64 // for API backwards compatibility. Equals `ReplicationLagSeconds`
ReplicationLagSeconds sql.NullInt64
SlaveHosts InstanceKeyMap
SlaveHosts InstanceKeyMap // for API backwards compatibility. Equals `Replicas`
Replicas InstanceKeyMap
ClusterName string
SuggestedClusterAlias string
Expand Down Expand Up @@ -134,8 +134,8 @@ type Instance struct {
// NewInstance creates a new, empty instance
func NewInstance() *Instance {
return &Instance{
SlaveHosts: make(map[InstanceKey]bool),
Problems: []string{},
Replicas: make(map[InstanceKey]bool),
Problems: []string{},
}
}

Expand All @@ -145,12 +145,13 @@ func (this *Instance) MarshalJSON() ([]byte, error) {
}{}
i.Instance = *this
// change terminology. Users of the orchestrator API can switch to new terminology and avoid using old terminology
i.Replicas = i.SlaveHosts
i.ReplicationLagSeconds = this.SlaveLagSeconds
i.ReplicationSQLThreadRuning = this.Slave_SQL_Running
i.ReplicationIOThreadRuning = this.Slave_IO_Running
i.LogReplicationUpdatesEnabled = this.LogSlaveUpdatesEnabled
//
// flip
i.SlaveHosts = i.Replicas
i.SlaveLagSeconds = this.ReplicationLagSeconds
i.LogSlaveUpdatesEnabled = this.LogReplicationUpdatesEnabled
i.Slave_SQL_Running = this.ReplicationSQLThreadRuning
i.Slave_IO_Running = this.ReplicationIOThreadRuning

return json.Marshal(i)
}

Expand Down Expand Up @@ -354,7 +355,7 @@ func (this *Instance) NextGTID() (string, error) {

// AddReplicaKey adds a replica to the list of this instance's replicas.
func (this *Instance) AddReplicaKey(replicaKey *InstanceKey) {
this.SlaveHosts.AddKey(*replicaKey)
this.Replicas.AddKey(*replicaKey)
}

// GetNextBinaryLog returns the successive, if any, binary log file to the one given
Expand Down Expand Up @@ -395,7 +396,7 @@ func (this *Instance) CanReplicateFrom(other *Instance) (bool, error) {
return false, fmt.Errorf("instance does not have binary logs enabled: %+v", other.Key)
}
if other.IsReplica() {
if !other.LogSlaveUpdatesEnabled {
if !other.LogReplicationUpdatesEnabled {
return false, fmt.Errorf("instance does not have log_slave_updates enabled: %+v", other.Key)
}
// OK for a master to not have log_slave_updates
Expand All @@ -404,7 +405,7 @@ func (this *Instance) CanReplicateFrom(other *Instance) (bool, error) {
if this.IsSmallerMajorVersion(other) && !this.IsBinlogServer() {
return false, fmt.Errorf("instance %+v has version %s, which is lower than %s on %+v ", this.Key, this.Version, other.Version, other.Key)
}
if this.LogBinEnabled && this.LogSlaveUpdatesEnabled {
if this.LogBinEnabled && this.LogReplicationUpdatesEnabled {
if this.IsSmallerBinlogFormat(other) {
return false, fmt.Errorf("Cannot replicate from %+v binlog format on %+v to %+v on %+v", other.Binlog_format, other.Key, this.Binlog_format, this.Key)
}
Expand Down Expand Up @@ -451,7 +452,7 @@ func (this *Instance) CanMove() (bool, error) {
return false, fmt.Errorf("%+v: instance is not replicating", this.Key)
}
if !this.SecondsBehindMaster.Valid {
return false, fmt.Errorf("%+v: cannot determine slave lag", this.Key)
return false, fmt.Errorf("%+v: cannot determine replication lag", this.Key)
}
if !this.HasReasonableMaintenanceReplicationLag() {
return false, fmt.Errorf("%+v: lags too much", this.Key)
Expand Down Expand Up @@ -515,10 +516,10 @@ func (this *Instance) LagStatusString() string {
if this.IsReplica() && !this.SecondsBehindMaster.Valid {
return "null"
}
if this.IsReplica() && this.SlaveLagSeconds.Int64 > int64(config.Config.ReasonableMaintenanceReplicationLagSeconds) {
return fmt.Sprintf("%+vs", this.SlaveLagSeconds.Int64)
if this.IsReplica() && this.ReplicationLagSeconds.Int64 > int64(config.Config.ReasonableMaintenanceReplicationLagSeconds) {
return fmt.Sprintf("%+vs", this.ReplicationLagSeconds.Int64)
}
return fmt.Sprintf("%+vs", this.SlaveLagSeconds.Int64)
return fmt.Sprintf("%+vs", this.ReplicationLagSeconds.Int64)
}

func (this *Instance) descriptionTokens() (tokens []string) {
Expand All @@ -537,7 +538,7 @@ func (this *Instance) descriptionTokens() (tokens []string) {
}
{
extraTokens := []string{}
if this.LogBinEnabled && this.LogSlaveUpdatesEnabled {
if this.LogBinEnabled && this.LogReplicationUpdatesEnabled {
extraTokens = append(extraTokens, ">>")
}
if this.UsingGTID() || this.SupportsOracleGTID {
Expand Down
Loading