Skip to content
This repository was archived by the owner on Feb 18, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go/agent/agent_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -298,7 +299,7 @@ func baseAgentUri(agentHostname string, agentPort int) string {
if config.Config.AgentsUseSSL {
protocol = "https"
}
uri := fmt.Sprintf("%s://%s:%d/api", protocol, agentHostname, agentPort)
uri := fmt.Sprintf("%s://%s/api", protocol, net.JoinHostPort(agentHostname, strconv.Itoa(agentPort)))
log.Debugf("orchestrator-agent uri: %s", uri)
return uri
}
Expand Down
9 changes: 6 additions & 3 deletions go/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os/user"
"regexp"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -179,8 +180,10 @@ func Cli(command string, strict bool, instance string, destination string, owner
rawInstanceKey = nil
}

if destination != "" && !strings.Contains(destination, ":") {
destination = fmt.Sprintf("%s:%d", destination, config.Config.DefaultInstancePort)
if destination != "" {
if _, _, err := net.SplitHostPort(destination); err != nil {
destination = net.JoinHostPort(destination, strconv.Itoa(config.Config.DefaultInstancePort))
}
}
destinationKey, err := inst.ParseResolveInstanceKey(destination)
if err != nil {
Expand Down Expand Up @@ -995,7 +998,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
log.Fatale(err)
}
for _, clusterPoolInstance := range clusterPoolInstances {
fmt.Println(fmt.Sprintf("%s\t%s\t%s\t%s:%d", clusterPoolInstance.ClusterName, clusterPoolInstance.ClusterAlias, clusterPoolInstance.Pool, clusterPoolInstance.Hostname, clusterPoolInstance.Port))
fmt.Println(fmt.Sprintf("%s\t%s\t%s\t%s", clusterPoolInstance.ClusterName, clusterPoolInstance.ClusterAlias, clusterPoolInstance.Pool, net.JoinHostPort(clusterPoolInstance.Hostname, strconv.Itoa(clusterPoolInstance.Port))))
}
}
case registerCliCommand("which-heuristic-cluster-pool-instances", "Pools", `List instances of a given cluster which are in either any pool or in a specific pool`):
Expand Down
26 changes: 13 additions & 13 deletions go/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package db
import (
"database/sql"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -52,11 +54,10 @@ func getMySQLURI() string {
if mysqlURI != "" {
return mysqlURI
}
mysqlURI := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?timeout=%ds&readTimeout=%ds&interpolateParams=true",
mysqlURI := fmt.Sprintf("%s:%s@tcp(%s)/%s?timeout=%ds&readTimeout=%ds&interpolateParams=true",
config.Config.MySQLOrchestratorUser,
config.Config.MySQLOrchestratorPassword,
config.Config.MySQLOrchestratorHost,
config.Config.MySQLOrchestratorPort,
net.JoinHostPort(config.Config.MySQLOrchestratorHost, strconv.Itoa(int(config.Config.MySQLOrchestratorPort))),
config.Config.MySQLOrchestratorDatabase,
config.Config.MySQLConnectTimeoutSeconds,
config.Config.MySQLOrchestratorReadTimeoutSeconds,
Expand All @@ -80,10 +81,10 @@ func OpenTopology(host string, port int) (*sql.DB, error) {
}

func openTopology(host string, port int, readTimeout int) (db *sql.DB, err error) {
mysql_uri := fmt.Sprintf("%s:%s@tcp(%s:%d)/?timeout=%ds&readTimeout=%ds&interpolateParams=true",
mysql_uri := fmt.Sprintf("%s:%s@tcp(%s)/?timeout=%ds&readTimeout=%ds&interpolateParams=true",
config.Config.MySQLTopologyUser,
config.Config.MySQLTopologyPassword,
host, port,
net.JoinHostPort(host, strconv.Itoa(port)),
config.Config.MySQLConnectTimeoutSeconds,
readTimeout,
)
Expand All @@ -106,11 +107,10 @@ func openTopology(host string, port int, readTimeout int) (db *sql.DB, err error
}

func openOrchestratorMySQLGeneric() (db *sql.DB, fromCache bool, err error) {
uri := fmt.Sprintf("%s:%s@tcp(%s:%d)/?timeout=%ds&readTimeout=%ds&interpolateParams=true",
uri := fmt.Sprintf("%s:%s@tcp(%s)/?timeout=%ds&readTimeout=%ds&interpolateParams=true",
config.Config.MySQLOrchestratorUser,
config.Config.MySQLOrchestratorPassword,
config.Config.MySQLOrchestratorHost,
config.Config.MySQLOrchestratorPort,
net.JoinHostPort(config.Config.MySQLOrchestratorHost, strconv.Itoa(int(config.Config.MySQLOrchestratorPort))),
config.Config.MySQLConnectTimeoutSeconds,
config.Config.MySQLOrchestratorReadTimeoutSeconds,
)
Expand Down Expand Up @@ -151,8 +151,9 @@ func OpenOrchestrator() (db *sql.DB, err error) {
db, fromCache, err = sqlutils.GetDB(getMySQLURI())
if err == nil && !fromCache {
// do not show the password but do show what we connect to.
safeMySQLURI := fmt.Sprintf("%s:?@tcp(%s:%d)/%s?timeout=%ds", config.Config.MySQLOrchestratorUser,
config.Config.MySQLOrchestratorHost, config.Config.MySQLOrchestratorPort, config.Config.MySQLOrchestratorDatabase, config.Config.MySQLConnectTimeoutSeconds)
safeMySQLURI := fmt.Sprintf("%s:?@tcp(%s)/%s?timeout=%ds", config.Config.MySQLOrchestratorUser,
net.JoinHostPort(config.Config.MySQLOrchestratorHost, strconv.Itoa(int(config.Config.MySQLOrchestratorPort))),
config.Config.MySQLOrchestratorDatabase, config.Config.MySQLConnectTimeoutSeconds)
log.Debugf("Connected to orchestrator backend: %v", safeMySQLURI)
if config.Config.MySQLOrchestratorMaxPoolConnections > 0 {
log.Debugf("Orchestrator pool SetMaxOpenConns: %d", config.Config.MySQLOrchestratorMaxPoolConnections)
Expand Down Expand Up @@ -180,9 +181,8 @@ func OpenOrchestrator() (db *sql.DB, err error) {
if maxIdleConns < 10 {
maxIdleConns = 10
}
log.Infof("Connecting to backend %s:%d: maxConnections: %d, maxIdleConns: %d",
config.Config.MySQLOrchestratorHost,
config.Config.MySQLOrchestratorPort,
log.Infof("Connecting to backend %s: maxConnections: %d, maxIdleConns: %d",
net.JoinHostPort(config.Config.MySQLOrchestratorHost, strconv.Itoa(int(config.Config.MySQLOrchestratorPort))),
config.Config.MySQLOrchestratorMaxPoolConnections,
maxIdleConns)
db.SetMaxIdleConns(maxIdleConns)
Expand Down
4 changes: 3 additions & 1 deletion go/db/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package db
import (
"crypto/tls"
"fmt"
"net"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -56,7 +58,7 @@ func init() {
}

func requiresTLS(host string, port int, mysql_uri string) bool {
cacheKey := fmt.Sprintf("%s:%d", host, port)
cacheKey := net.JoinHostPort(host, strconv.Itoa(port))

if value, found := requireTLSCache.Get(cacheKey); found {
readInstanceTLSCacheCounter.Inc(1)
Expand Down
3 changes: 2 additions & 1 deletion go/http/agents_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package http

import (
"fmt"
"net"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -102,7 +103,7 @@ func (this *HttpAgentsAPI) AgentsInstances(params martini.Params, r render.Rende
agents, err := agent.ReadAgents()
hostnames := []string{}
for _, agent := range agents {
hostnames = append(hostnames, fmt.Sprintf("%s:%d", agent.Hostname, agent.MySQLPort))
hostnames = append(hostnames, net.JoinHostPort(agent.Hostname, strconv.Itoa(int(agent.MySQLPort))))
}

if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion go/http/httpbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package http

import (
"fmt"
"net"
"net/http"
"strings"

Expand Down Expand Up @@ -151,7 +152,7 @@ func getClusterHint(params map[string]string) string {
return params["clusterName"]
}
if params["host"] != "" && params["port"] != "" {
return fmt.Sprintf("%s:%s", params["host"], params["port"])
return net.JoinHostPort(params["host"], params["port"])
}
return ""
}
Expand Down
13 changes: 9 additions & 4 deletions go/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,11 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
OR master_instance.master_port = 0
OR substr(master_instance.master_host, 1, 2) = '//') AS is_master,
MIN(master_instance.is_co_master) AS is_co_master,
MIN(CONCAT(master_instance.hostname,
':',
master_instance.port) = master_instance.cluster_name) AS is_cluster_master,
MIN((CASE WHEN INSTR(master_instance.hostname, ':') > 0
THEN concat('[', master_instance.hostname, ']:', master_instance.port)
ELSE concat(master_instance.hostname, ':', master_instance.port)
END)
= master_instance.cluster_name) AS is_cluster_master,
MIN(master_instance.gtid_mode) AS gtid_mode,
COUNT(replica_instance.server_id) AS count_replicas,
IFNULL(SUM(replica_instance.last_checked <= replica_instance.last_seen),
Expand All @@ -126,7 +128,10 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
AND replica_instance.slave_sql_running = 1),
0) AS count_replicas_failing_to_connect_to_master,
MIN(master_instance.replication_depth) AS replication_depth,
GROUP_CONCAT(concat(replica_instance.Hostname, ':', replica_instance.Port)) as slave_hosts,
GROUP_CONCAT((CASE WHEN INSTR(replica_instance.hostname, ':') > 0
THEN concat('[', replica_instance.hostname, ']:', replica_instance.port)
ELSE concat(replica_instance.hostname, ':', replica_instance.port)
END)) as slave_hosts,
MIN(
master_instance.slave_sql_running = 1
AND master_instance.slave_io_running = 0
Expand Down
4 changes: 3 additions & 1 deletion go/inst/candidate_database_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package inst

import (
"fmt"
"net"
"strconv"

"github.com/github/orchestrator/go/db"
)
Expand Down Expand Up @@ -46,7 +48,7 @@ func (cdi *CandidateDatabaseInstance) WithCurrentTime() *CandidateDatabaseInstan

// String returns a string representation of the CandidateDatabaseInstance struct
func (cdi *CandidateDatabaseInstance) String() string {
return fmt.Sprintf("%s:%d %s", cdi.Hostname, cdi.Port, cdi.PromotionRule)
return fmt.Sprintf("%s %s", net.JoinHostPort(cdi.Hostname, strconv.Itoa(cdi.Port)), cdi.PromotionRule)
}

// Key returns an instance key representing this candidate
Expand Down
16 changes: 11 additions & 5 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,14 +620,16 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
defer waitGroup.Done()
err := sqlutils.QueryRowsMap(db, `
select
substring_index(host, ':', 1) as slave_hostname
host
from
information_schema.processlist
where
command IN ('Binlog Dump', 'Binlog Dump GTID')
`,
func(m sqlutils.RowMap) error {
cname, resolveErr := ResolveHostname(m.GetString("slave_hostname"))
host := m.GetString("host")
host = host[0:strings.LastIndex(host, ":")]
cname, resolveErr := ResolveHostname(host)
if resolveErr != nil {
logReadTopologyInstanceError(instanceKey, "ResolveHostname: processlist", resolveErr)
}
Expand Down Expand Up @@ -1369,7 +1371,11 @@ func SearchInstances(searchString string) ([](*Instance), error) {
or instr(cluster_name, ?) > 0
or instr(version, ?) > 0
or instr(version_comment, ?) > 0
or instr(concat(hostname, ':', port), ?) > 0
or instr(
CASE WHEN INSTR(hostname, ':') THEN concat('[', hostname, ']:', port)
ELSE concat(hostname, ':', port)
END,
?) > 0
or instr(suggested_cluster_alias, ?) > 0
or concat(server_id, '') = ?
or concat(port, '') = ?
Expand Down Expand Up @@ -1485,8 +1491,8 @@ func ReadDowntimedInstances(clusterName string) ([](*Instance), error) {
func ReadClusterCandidateInstances(clusterName string) ([](*Instance), error) {
condition := `
cluster_name = ?
and concat(hostname, ':', port) in (
select concat(hostname, ':', port)
and concat(hostname, ' ', port) in (
select concat(hostname, ' ', port)
from candidate_database_instance
where promotion_rule in ('must', 'prefer')
)
Expand Down
3 changes: 2 additions & 1 deletion go/inst/instance_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package inst
import (
"fmt"
"github.com/github/orchestrator/go/config"
"net"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -171,7 +172,7 @@ func (this *InstanceKey) ReattachedKey() *InstanceKey {

// StringCode returns an official string representation of this key
func (this *InstanceKey) StringCode() string {
return fmt.Sprintf("%s:%d", this.Hostname, this.Port)
return net.JoinHostPort(this.Hostname, strconv.Itoa(this.Port))
}

// DisplayString returns a user-friendly string representation of this key
Expand Down
4 changes: 2 additions & 2 deletions go/inst/maintenance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ func ExpireMaintenance() error {
database_instance_maintenance
where
explicitly_bounded = 0
and concat(processing_node_hostname, ':', processing_node_token) not in (
select concat(hostname, ':', token) from node_health
and concat(processing_node_hostname, ' ', processing_node_token) not in (
select concat(hostname, ' ', token) from node_health
)
`,
)
Expand Down
5 changes: 4 additions & 1 deletion go/inst/pool_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ func ReadAllPoolInstancesSubmissions() ([]PoolInstancesSubmission, error) {
select
pool,
min(registered_at) as registered_at,
GROUP_CONCAT(concat(hostname, ':', port)) as hosts
GROUP_CONCAT(
CASE WHEN INSTR(hostname, ':') THEN concat('[', hostname, ']:', port)
ELSE concat(hostname, ':', port)
END) as hosts
from
database_instance_pool
group by
Expand Down
4 changes: 2 additions & 2 deletions go/logic/topology_recovery_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,8 @@ func AcknowledgeCrashedRecoveries() (countAcknowledgedEntries int64, err error)
whereClause := `
in_active_period = 1
and end_recovery is null
and concat(processing_node_hostname, ':', processcing_node_token) not in (
select concat(hostname, ':', token) from node_health
and concat(processing_node_hostname, ' ', processcing_node_token) not in (
select concat(hostname, ' ', token) from node_health
)
`
return acknowledgeRecoveries("orchestrator", "detected crashed recovery", true, whereClause, sqlutils.Args())
Expand Down
7 changes: 4 additions & 3 deletions go/process/health_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package process

import (
"net"
"strconv"
"time"

"fmt"
"github.com/github/orchestrator/go/config"
"github.com/github/orchestrator/go/db"
"github.com/openark/golib/log"
Expand Down Expand Up @@ -80,8 +81,8 @@ func WriteRegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) {
if config.Config.IsSQLite() {
dbBackend = config.Config.SQLite3DataFile
} else {
dbBackend = fmt.Sprintf("%s:%d", config.Config.MySQLOrchestratorHost,
config.Config.MySQLOrchestratorPort)
dbBackend = net.JoinHostPort(config.Config.MySQLOrchestratorHost,
strconv.Itoa(int(config.Config.MySQLOrchestratorPort)))
}
sqlResult, err := db.ExecOrchestrator(`
insert ignore into node_health
Expand Down
22 changes: 12 additions & 10 deletions go/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"fmt"
"math/rand"
"net"
"strings"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -105,13 +105,12 @@ func computeLeaderURI() (uri string, err error) {
}

hostname := config.Config.RaftAdvertise
listenTokens := strings.Split(config.Config.ListenAddress, ":")
if len(listenTokens) < 2 {
_, port, err := net.SplitHostPort(config.Config.ListenAddress)
if err != nil {
return uri, fmt.Errorf("computeLeaderURI: cannot determine listen port out of config.Config.ListenAddress: %+v", config.Config.ListenAddress)
}
port := listenTokens[1]

uri = fmt.Sprintf("%s://%s:%s", scheme, hostname, port)
uri = fmt.Sprintf("%s://%s", scheme, net.JoinHostPort(hostname, port))
return uri, nil
}

Expand Down Expand Up @@ -196,16 +195,19 @@ func normalizeRaftHostnameIP(host string) (string, error) {
// normalizeRaftNode attempts to make sure there's a port to the given node.
// It consults the DefaultRaftPort when there isn't
func normalizeRaftNode(node string) (string, error) {
hostPort := strings.Split(node, ":")
host, err := normalizeRaftHostnameIP(hostPort[0])
host, port, err := net.SplitHostPort(node)
if err != nil {
host = node
}
host, err = normalizeRaftHostnameIP(host)
if err != nil {
return host, err
}
if len(hostPort) > 1 {
return fmt.Sprintf("%s:%s", host, hostPort[1]), nil
if port != "" {
return net.JoinHostPort(host, port), nil
} else if config.Config.DefaultRaftPort != 0 {
// No port specified, add one
return fmt.Sprintf("%s:%d", host, config.Config.DefaultRaftPort), nil
return net.JoinHostPort(host, strconv.Itoa(config.Config.DefaultRaftPort)), nil
} else {
return host, nil
}
Expand Down
2 changes: 1 addition & 1 deletion resources/public/js/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ $(document).ready(function() {
}
$("[data-agent=hostname]").html(agent.Hostname)
$("[data-agent=hostname_search]").html(
'<a href="' + appUrl('/web/search?s=' + agent.Hostname + ':' + agent.MySQLPort) + '">' + agent.Hostname + '</a>' + '<div class="pull-right"><button class="btn btn-xs btn-success" data-command="discover" data-hostname="' + agent.Hostname + '" data-mysql-port="' + agent.MySQLPort + '">Discover</button></div>'
'<a href="' + appUrl('/web/search?s=' + joinHostPort(agent.Hostname, agent.MySQLPort)) + '">' + agent.Hostname + '</a>' + '<div class="pull-right"><button class="btn btn-xs btn-success" data-command="discover" data-hostname="' + agent.Hostname + '" data-mysql-port="' + agent.MySQLPort + '">Discover</button></div>'
);
$("[data-agent=port]").html(agent.Port)
$("[data-agent=last_submitted]").html(agent.LastSubmitted)
Expand Down
Loading