diff --git a/cmd/kar-controllers/app/generic-server.go b/cmd/kar-controllers/app/generic-server.go new file mode 100644 index 00000000..fea8bce5 --- /dev/null +++ b/cmd/kar-controllers/app/generic-server.go @@ -0,0 +1,122 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package app + +import ( + "context" + "fmt" + "net" + "net/http" + "strconv" + "time" + + logger "k8s.io/klog/v2" +) + +type ServerOption func(*Server) + +// WithTimeout sets the shutdown timeout for the server. +func WithTimeout(timeout time.Duration) ServerOption { + return func(s *Server) { + s.shutdownTimeout = timeout + } +} + +type Server struct { + httpServer http.Server + listener net.Listener + endpoint string + shutdownTimeout time.Duration +} + +func NewServer(port int, endpoint string, handler http.Handler, options ...ServerOption) (*Server, error) { + addr := "0" + if port != 0 { + addr = ":" + strconv.Itoa(port) + } + + listener, err := newListener(addr) + if err != nil { + return nil, err + } + + mux := http.NewServeMux() + mux.Handle(endpoint, handler) + + s := &Server{ + endpoint: endpoint, + listener: listener, + httpServer: http.Server{Handler: mux}, + shutdownTimeout: 30 * time.Second, // Default value + } + + for _, opt := range options { + opt(s) + } + + return s, nil +} + +func (s *Server) Start() (err error) { + if s.listener == nil { + logger.Infof("Serving endpoint %s is disabled", s.endpoint) + return + } + + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, r) + } + }() + + logger.Infof("Started serving endpoint %s at %s", s.endpoint, s.listener.Addr()) + if e := s.httpServer.Serve(s.listener); e != http.ErrServerClosed { + return fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, e) + } + return +} + +func (s *Server) Shutdown() error { + if s.listener == nil { + return nil + } + + logger.Infof("Shutting down endpoint %s at %s (gracefully waiting for %s)", s.endpoint, s.listener.Addr(), s.shutdownTimeout) + + shutdownCtx, cancel := context.WithTimeout(context.Background(), s.shutdownTimeout) + defer cancel() + + // Try graceful shutdown + if err := s.httpServer.Shutdown(shutdownCtx); err != nil { + return fmt.Errorf("failed to shutdown server gracefully: %v", err) + } + return nil +} + +// newListener creates a new TCP listener bound to the given address. +func newListener(addr string) (net.Listener, error) { + // Add a case to disable serving altogether + if addr == "0" { + return nil, nil + } + + listener, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to create listener: %v", err) + } + + return listener, nil +} diff --git a/cmd/kar-controllers/app/metrics/prom-metrics.go b/cmd/kar-controllers/app/metrics/prom-metrics.go new file mode 100644 index 00000000..273718e1 --- /dev/null +++ b/cmd/kar-controllers/app/metrics/prom-metrics.go @@ -0,0 +1,26 @@ +package metrics + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// Global Prometheus Registry +var globalPromRegistry = prometheus.NewRegistry() + +// metricsHandler returns a http.Handler that serves the prometheus metrics +func PrometheusHandler() http.Handler { + // Add Go module build info. + globalPromRegistry.MustRegister(collectors.NewBuildInfoCollector()) + globalPromRegistry.MustRegister(collectors.NewGoCollector()) + globalPromRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + + handlerOpts := promhttp.HandlerOpts{ + ErrorHandling: promhttp.HTTPErrorOnError, + } + + return promhttp.HandlerFor(globalPromRegistry, handlerOpts) +} \ No newline at end of file diff --git a/cmd/kar-controllers/app/options/options.go b/cmd/kar-controllers/app/options/options.go index 7d42288c..ea6a84e5 100644 --- a/cmd/kar-controllers/app/options/options.go +++ b/cmd/kar-controllers/app/options/options.go @@ -38,8 +38,9 @@ type ServerOption struct { HeadOfLineHoldingTime int QuotaEnabled bool // Controller is to evaluate quota per request QuotaRestURL string - HealthProbeListenAddr string + HealthProbeListenPort int DispatchResourceReservationTimeout int64 + MetricsListenPort int } // NewServerOption creates a new CMServer with a default config. @@ -64,7 +65,8 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) { fs.BoolVar(&s.QuotaEnabled, "quotaEnabled", s.QuotaEnabled, "Enable quota policy evaluation. Default is false.") fs.StringVar(&s.QuotaRestURL, "quotaURL", s.QuotaRestURL, "URL for ReST quota management. Default is none.") fs.IntVar(&s.SecurePort, "secure-port", 6443, "The port on which to serve secured, authenticated access for metrics.") - fs.StringVar(&s.HealthProbeListenAddr, "healthProbeListenAddr", ":8081", "Listen address for health probes. Defaults to ':8081'") + fs.IntVar(&s.HealthProbeListenPort, "healthProbeListenPort", 8081, "Listen port for health probes. Defaults to ':8081'") + fs.IntVar(&s.MetricsListenPort, "metricsListenPort", 9090, "Listen port for metrics. Defaults to ':9090'") fs.Int64Var(&s.DispatchResourceReservationTimeout, "dispatchResourceReservationTimeout", s.DispatchResourceReservationTimeout, "Resource reservation timeout for pods to be created once AppWrapper is dispatched, in millisecond. Defaults to '300000', 5 minutes") } diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index 4deb4ac1..6ed71eac 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -17,21 +17,29 @@ limitations under the License. package app import ( - "net/http" "strings" + "context" + "fmt" + "net/http" + "sync" - _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - + "golang.org/x/sync/errgroup" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/utils/pointer" + "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/metrics" "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" - "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/health" + + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + + "k8s.io/utils/pointer" + + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config" ) + func buildConfig(master, kubeconfig string) (*rest.Config, error) { if master != "" || kubeconfig != "" { return clientcmd.BuildConfigFromFlags(master, kubeconfig) @@ -39,14 +47,12 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) { return rest.InClusterConfig() } -func Run(opt *options.ServerOption) error { +func Run(ctx context.Context, opt *options.ServerOption) error { restConfig, err := buildConfig(opt.Master, opt.Kubeconfig) if err != nil { return err } - neverStop := make(chan struct{}) - restConfig.QPS = 100.0 restConfig.Burst = 200.0 @@ -62,29 +68,51 @@ func Run(opt *options.ServerOption) error { AgentConfigs: strings.Split(opt.AgentConfigs, ","), } - jobctrl := queuejob.NewJobController(restConfig, mcadConfig, extConfig) - if jobctrl == nil { - return nil - } - jobctrl.Run(neverStop) - // This call is blocking (unless an error occurs) which equates to <-neverStop - err = listenHealthProbe(opt) + g, gCtx := errgroup.WithContext(ctx) + + // metrics server + metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", metrics.PrometheusHandler()) if err != nil { return err } - return nil -} - -// Starts the health probe listener -func listenHealthProbe(opt *options.ServerOption) error { - handler := http.NewServeMux() - handler.Handle("/healthz", &health.Handler{}) - err := http.ListenAndServe(opt.HealthProbeListenAddr, handler) + healthServer, err := NewServer(opt.HealthProbeListenPort, "/healthz", healthHandler()) if err != nil { return err } - return nil + jobctrl := queuejob.NewJobController(restConfig, mcadConfig, extConfig) + if jobctrl == nil { + return fmt.Errorf("failed to create a job controller") + } + + wg := &sync.WaitGroup{} + wg.Add(1) + g.Go(func() error { + defer wg.Done() + jobctrl.Run(gCtx.Done()) + return nil + }) + + g.Go(metricsServer.Start) + g.Go(healthServer.Start) + + g.Go(func() error { + wg.Wait() + return metricsServer.Shutdown() + }) + + g.Go(func() error { + wg.Wait() + return healthServer.Shutdown() + }) + + return g.Wait() +} + +func healthHandler() http.Handler { + healthHandler := http.NewServeMux() + healthHandler.Handle("/healthz", &health.Handler{}) + return healthHandler } diff --git a/cmd/kar-controllers/main.go b/cmd/kar-controllers/main.go index d66c4607..d83f44ec 100644 --- a/cmd/kar-controllers/main.go +++ b/cmd/kar-controllers/main.go @@ -1,19 +1,4 @@ /* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -/* Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors. Licensed under the Apache License, Version 2.0 (the "License"); @@ -35,6 +20,7 @@ import ( "fmt" "os" + "k8s.io/apiserver/pkg/server" "k8s.io/klog/v2" "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app" @@ -49,8 +35,12 @@ func main() { s.AddFlags(flagSet) flag.Parse() - if err := app.Run(s); err != nil { + ctx := server.SetupSignalContext() + + // Run the server + if err := app.Run(ctx, s); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } + } diff --git a/deployment/mcad-controller/templates/deployment.yaml b/deployment/mcad-controller/templates/deployment.yaml index 44da801f..dfbd3fed 100644 --- a/deployment/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-controller/templates/deployment.yaml @@ -11,9 +11,25 @@ spec: - name: http port: 80 targetPort: 8080 + - name: metrics + port: 9090 + targetPort: 9090 selector: app: custom-metrics-apiserver --- +apiVersion: v1 +kind: Service +metadata: + name: metrics + namespace: kube-system +spec: + ports: + - name: metrics + port: 9090 + targetPort: 9090 + selector: + app: metrics +--- #{{ if .Values.configMap.quotaRestUrl }} apiVersion: v1 kind: Service @@ -260,6 +276,8 @@ spec: name: https - containerPort: 8080 name: http + - containerPort: 9090 + name: metrics volumeMounts: - mountPath: /tmp name: temp-vol