From a02e9b95ea7ce153b180ca1c9149c0c4604c4f34 Mon Sep 17 00:00:00 2001 From: ChristianZaccaria Date: Thu, 21 Mar 2024 07:47:37 +0000 Subject: [PATCH 1/5] WIP - Creation/Deletion of ingress and routes in RayCluster Controller --- go.mod | 2 +- pkg/controllers/raycluster_controller.go | 285 ++++++++++++++++++++--- 2 files changed, 258 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index 93fa2c9fc..fae85661b 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/project-codeflare/codeflare-operator go 1.20 require ( + github.com/go-logr/logr v1.2.4 github.com/onsi/ginkgo/v2 v2.11.0 github.com/onsi/gomega v1.27.10 github.com/openshift/api v0.0.0-20230213134911-7ba313770556 @@ -42,7 +43,6 @@ require ( github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect - github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.2.4 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect diff --git a/pkg/controllers/raycluster_controller.go b/pkg/controllers/raycluster_controller.go index 9e45bf78d..e6157e0ba 100644 --- a/pkg/controllers/raycluster_controller.go +++ b/pkg/controllers/raycluster_controller.go @@ -22,18 +22,28 @@ import ( "crypto/sha1" "encoding/base64" + "fmt" + "strconv" + "strings" + + "github.com/go-logr/logr" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" coreapply "k8s.io/client-go/applyconfigurations/core/v1" v1 "k8s.io/client-go/applyconfigurations/meta/v1" + networkingv1ac "k8s.io/client-go/applyconfigurations/networking/v1" rbacapply "k8s.io/client-go/applyconfigurations/rbac/v1" + "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -53,12 +63,14 @@ type RayClusterReconciler struct { } const ( - requeueTime = 10 - controllerName = "codeflare-raycluster-controller" - oAuthFinalizer = "ray.openshift.ai/oauth-finalizer" - oAuthServicePort = 443 - oAuthServicePortName = "oauth-proxy" - logRequeueing = "requeueing" + requeueTime = 10 + controllerName = "codeflare-raycluster-controller" + oAuthFinalizer = "ray.openshift.ai/oauth-finalizer" + oAuthServicePort = 443 + oAuthServicePortName = "oauth-proxy" + oauthAnnotation = "codeflare.dev/oauth" + RegularServicePortName = "dashboard" + logRequeueing = "requeueing" ) var ( @@ -97,6 +109,10 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, client.IgnoreNotFound(err) } + isLocalInteractive := annotationBoolVal(logger, &cluster, "sdk.codeflare.dev/local_interactive") + isOpenShift, ingressHost := getClusterType(logger, r.kubeClient, &cluster) + ingressDomain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] + if cluster.ObjectMeta.DeletionTimestamp.IsZero() { if !controllerutil.ContainsFinalizer(&cluster, oAuthFinalizer) { logger.Info("Add a finalizer", "finalizer", oAuthFinalizer) @@ -130,34 +146,81 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredClusterRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) - if err != nil { - logger.Error(err, "Failed to update OAuth Route") - } + if cluster.Status.State != "suspended" && annotationBoolVal(logger, &cluster, oauthAnnotation) && isOpenShift { + logger.Info("Creating OAuth Objects") + _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredClusterRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update OAuth Route") + } - _, err = r.kubeClient.CoreV1().Secrets(cluster.Namespace).Apply(ctx, desiredOAuthSecret(&cluster, r), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) - if err != nil { - logger.Error(err, "Failed to create OAuth Secret") - } + _, err = r.kubeClient.CoreV1().Secrets(cluster.Namespace).Apply(ctx, desiredOAuthSecret(&cluster, r), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to create OAuth Secret") + } - _, err = r.kubeClient.CoreV1().Services(cluster.Namespace).Apply(ctx, desiredOAuthService(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) - if err != nil { - logger.Error(err, "Failed to update OAuth Service") - } + _, err = r.kubeClient.CoreV1().Services(cluster.Namespace).Apply(ctx, desiredOAuthService(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update OAuth Service") + } - _, err = r.kubeClient.CoreV1().ServiceAccounts(cluster.Namespace).Apply(ctx, desiredServiceAccount(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) - if err != nil { - logger.Error(err, "Failed to update OAuth ServiceAccount") - } + _, err = r.kubeClient.CoreV1().ServiceAccounts(cluster.Namespace).Apply(ctx, desiredServiceAccount(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update OAuth ServiceAccount") + } - _, err = r.kubeClient.RbacV1().ClusterRoleBindings().Apply(ctx, desiredOAuthClusterRoleBinding(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) - if err != nil { - logger.Error(err, "Failed to update OAuth ClusterRoleBinding") + _, err = r.kubeClient.RbacV1().ClusterRoleBindings().Apply(ctx, desiredOAuthClusterRoleBinding(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update OAuth ClusterRoleBinding") + } + + } else if cluster.Status.State != "suspended" && !annotationBoolVal(logger, &cluster, oauthAnnotation) && isOpenShift { + logger.Info("Creating Dashboard Route") + _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, createRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update Dashboard Route") + } + if isLocalInteractive && ingressDomain != "" { + logger.Info("Creating RayClient Route") + _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, createRayClientRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update RayClient Route") + } + } + return ctrl.Result{}, nil + + } else if cluster.Status.State != "suspended" && !annotationBoolVal(logger, &cluster, oauthAnnotation) && !isOpenShift { + logger.Info("Creating Dashboard Ingress") + _, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, createIngressApplyConfiguration(&cluster, ingressHost), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + // This log is info level since errors are not fatal and are expected + logger.Info("WARN: Failed to update Dashboard Ingress", "error", err.Error(), logRequeueing, true) + } + if isLocalInteractive && ingressDomain != "" { + logger.Info("Creating RayClient Ingress") + _, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, createRayClientIngress(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update RayClient Ingress") + } + } + return ctrl.Result{}, nil } return ctrl.Result{}, nil } +func annotationBoolVal(logger logr.Logger, cluster *rayv1.RayCluster, annotation string) bool { + val := cluster.ObjectMeta.Annotations[annotation] + boolVal, err := strconv.ParseBool(val) + if err != nil { + logger.Error(err, "Could not convert", annotation, "value to bool", val) + } + if boolVal { + return true + } else { + return false + } +} + func crbNameFromCluster(cluster *rayv1.RayCluster) string { return cluster.Name + "-" + cluster.Namespace + "-auth" // NOTE: potential naming conflicts ie {name: foo, ns: bar-baz} and {name: foo-bar, ns: baz} } @@ -193,19 +256,23 @@ func desiredServiceAccount(cluster *rayv1.RayCluster) *coreapply.ServiceAccountA WithAnnotations(map[string]string{ "serviceaccounts.openshift.io/oauth-redirectreference.first": "" + `{"kind":"OAuthRedirectReference","apiVersion":"v1",` + - `"reference":{"kind":"Route","name":"` + routeNameFromCluster(cluster) + `"}}`, + `"reference":{"kind":"Route","name":"` + dashboardNameFromCluster(cluster) + `"}}`, }). WithOwnerReferences( v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion), ) } -func routeNameFromCluster(cluster *rayv1.RayCluster) string { +func dashboardNameFromCluster(cluster *rayv1.RayCluster) string { return "ray-dashboard-" + cluster.Name } +func rayClientNameFromCluster(cluster *rayv1.RayCluster) string { + return "rayclient-" + cluster.Name +} + func desiredClusterRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { - return routeapply.Route(routeNameFromCluster(cluster), cluster.Namespace). + return routeapply.Route(dashboardNameFromCluster(cluster), cluster.Namespace). WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). WithSpec(routeapply.RouteSpec(). WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(oauthServiceNameFromCluster(cluster))). @@ -283,3 +350,165 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&rayv1.RayCluster{}). Complete(r) } + +func serviceNameFromCluster(cluster *rayv1.RayCluster) string { + return cluster.Name + "-head-svc" +} + +func createRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { + return routeapply.Route(dashboardNameFromCluster(cluster), cluster.Namespace). + WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithSpec(routeapply.RouteSpec(). + WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster))). + WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString(RegularServicePortName))). + WithTLS(routeapply.TLSConfig(). + WithTermination("edge")), + ). + WithOwnerReferences( + v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion), + ) +} + +func createRayClientRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { + ingress_domain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] + return routeapply.Route(rayClientNameFromCluster(cluster), cluster.Namespace). + WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithSpec(routeapply.RouteSpec(). + WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace + "." + ingress_domain). + WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster)).WithWeight(100)). + WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString("client"))). + WithTLS(routeapply.TLSConfig().WithTermination("passthrough")), + ). + WithOwnerReferences( + v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion), + ) +} + +// Create an Ingress object for the RayCluster +func createRayClientIngress(cluster *rayv1.RayCluster) *networkingv1ac.IngressApplyConfiguration { + ingress_domain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] + return networkingv1ac.Ingress(rayClientNameFromCluster(cluster), cluster.Namespace). + WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithAnnotations(map[string]string{ + "nginx.ingress.kubernetes.io/rewrite-target": "/", + "nginx.ingress.kubernetes.io/ssl-redirect": "true", + "nginx.ingress.kubernetes.io/ssl-passthrough": "true", + }). + WithOwnerReferences(v1.OwnerReference(). + WithAPIVersion(cluster.APIVersion). + WithKind(cluster.Kind). + WithName(cluster.Name). + WithUID(types.UID(cluster.UID))). + WithSpec(networkingv1ac.IngressSpec(). + WithIngressClassName("nginx"). + WithRules(networkingv1ac.IngressRule(). + WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace + "." + ingress_domain). + WithHTTP(networkingv1ac.HTTPIngressRuleValue(). + WithPaths(networkingv1ac.HTTPIngressPath(). + WithPath("/"). + WithPathType(networkingv1.PathTypeImplementationSpecific). + WithBackend(networkingv1ac.IngressBackend(). + WithService(networkingv1ac.IngressServiceBackend(). + WithName(serviceNameFromCluster(cluster)). + WithPort(networkingv1ac.ServiceBackendPort(). + WithNumber(10001), + ), + ), + ), + ), + ), + ), + ) + // Optionally, add TLS configuration here if needed +} + +// Create an Ingress object for the RayCluster +func createIngressApplyConfiguration(cluster *rayv1.RayCluster, ingressHost string) *networkingv1ac.IngressApplyConfiguration { + return networkingv1ac.Ingress(dashboardNameFromCluster(cluster), cluster.Namespace). + WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithOwnerReferences(v1.OwnerReference(). + WithAPIVersion(cluster.APIVersion). + WithKind(cluster.Kind). + WithName(cluster.Name). + WithUID(types.UID(cluster.UID))). + WithSpec(networkingv1ac.IngressSpec(). + WithRules(networkingv1ac.IngressRule(). + WithHost(ingressHost). // kind host name or ingress_domain + WithHTTP(networkingv1ac.HTTPIngressRuleValue(). + WithPaths(networkingv1ac.HTTPIngressPath(). + WithPath("/"). + WithPathType(networkingv1.PathTypePrefix). + WithBackend(networkingv1ac.IngressBackend(). + WithService(networkingv1ac.IngressServiceBackend(). + WithName(serviceNameFromCluster(cluster)). + WithPort(networkingv1ac.ServiceBackendPort(). + WithName(RegularServicePortName), + ), + ), + ), + ), + ), + ), + ) + // Optionally, add TLS configuration here if needed +} + +// isOnKindCluster checks if the current cluster is a KinD cluster. +// It searches for a node with a label commonly used by KinD clusters. +func isOnKindCluster(clientset *kubernetes.Clientset) (bool, error) { + nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ + LabelSelector: "kubernetes.io/hostname=kind-control-plane", + }) + if err != nil { + return false, err + } + // If we find one or more nodes with the label, assume it's a KinD cluster. + return len(nodes.Items) > 0, nil +} + +// getDiscoveryClient returns a discovery client for the current reconciler +func getDiscoveryClient(config *rest.Config) (*discovery.DiscoveryClient, error) { + return discovery.NewDiscoveryClientForConfig(config) +} + +// Check where we are running. We are trying to distinguish here whether +// this is vanilla kubernetes cluster or Openshift +func getClusterType(logger logr.Logger, clientset *kubernetes.Clientset, cluster *rayv1.RayCluster) (bool, string) { + // The discovery package is used to discover APIs supported by a Kubernetes API server. + ingress_domain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] + config, err := ctrl.GetConfig() + if err == nil && config != nil { + dclient, err := getDiscoveryClient(config) + if err == nil && dclient != nil { + apiGroupList, err := dclient.ServerGroups() + if err != nil { + logger.Info("Error while querying ServerGroups, assuming we're on Vanilla Kubernetes") + return false, "" + } else { + for i := 0; i < len(apiGroupList.Groups); i++ { + if strings.HasSuffix(apiGroupList.Groups[i].Name, ".openshift.io") { + logger.Info("We detected being on OpenShift!") + return true, "" + } + } + onKind, _ := isOnKindCluster(clientset) + if onKind && ingress_domain == "" { + logger.Info("We detected being on a KinD cluster!") + return false, "kind" + } else { + logger.Info("We detected being on Vanilla Kubernetes!") + return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingress_domain) + } + } + } else { + logger.Info("Cannot retrieve a DiscoveryClient, assuming we're on Vanilla Kubernetes") + return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingress_domain) + } + } else { + logger.Info("Cannot retrieve config, assuming we're on Vanilla Kubernetes") + return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingress_domain) + } +} + +// No more ingress_options - Removing completely. +// What to do about ingress_domain? Needed for local_interactive? From 9f583dc8aa0eb3b23486bfc6152f6a1021f8f73a Mon Sep 17 00:00:00 2001 From: ChristianZaccaria Date: Wed, 3 Apr 2024 18:58:48 +0100 Subject: [PATCH 2/5] Always run RayCluster Controller --- main.go | 2 +- pkg/controllers/raycluster_controller.go | 35 ++++++++++++++---------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/main.go b/main.go index 05a53f074..30122002b 100644 --- a/main.go +++ b/main.go @@ -188,7 +188,7 @@ func main() { } v, err := HasAPIResourceForGVK(kubeClient.DiscoveryClient, rayv1.GroupVersion.WithKind("RayCluster")) - if v && *cfg.KubeRay.RayDashboardOAuthEnabled { + if v { rayClusterController := controllers.RayClusterReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()} exitOnError(rayClusterController.SetupWithManager(mgr), "Error setting up RayCluster controller") } else if err != nil { diff --git a/pkg/controllers/raycluster_controller.go b/pkg/controllers/raycluster_controller.go index e6157e0ba..7ccb02e60 100644 --- a/pkg/controllers/raycluster_controller.go +++ b/pkg/controllers/raycluster_controller.go @@ -27,6 +27,7 @@ import ( "strings" "github.com/go-logr/logr" + "github.com/project-codeflare/codeflare-operator/pkg/config" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" corev1 "k8s.io/api/core/v1" @@ -63,19 +64,19 @@ type RayClusterReconciler struct { } const ( - requeueTime = 10 - controllerName = "codeflare-raycluster-controller" - oAuthFinalizer = "ray.openshift.ai/oauth-finalizer" - oAuthServicePort = 443 - oAuthServicePortName = "oauth-proxy" - oauthAnnotation = "codeflare.dev/oauth" - RegularServicePortName = "dashboard" - logRequeueing = "requeueing" + requeueTime = 10 + controllerName = "codeflare-raycluster-controller" + oAuthFinalizer = "ray.openshift.ai/oauth-finalizer" + oAuthServicePort = 443 + oAuthServicePortName = "oauth-proxy" + regularServicePortName = "dashboard" + logRequeueing = "requeueing" ) var ( deletePolicy = metav1.DeletePropagationForeground deleteOptions = client.DeleteOptions{PropagationPolicy: &deletePolicy} + configInstance *config.CodeFlareOperatorConfiguration ) // +kubebuilder:rbac:groups=ray.io,resources=rayclusters,verbs=get;list;watch;create;update;patch;delete @@ -146,7 +147,7 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - if cluster.Status.State != "suspended" && annotationBoolVal(logger, &cluster, oauthAnnotation) && isOpenShift { + if cluster.Status.State != "suspended" && isRayDashboardOAuthEnabled() && isOpenShift { logger.Info("Creating OAuth Objects") _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredClusterRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) if err != nil { @@ -173,7 +174,7 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) logger.Error(err, "Failed to update OAuth ClusterRoleBinding") } - } else if cluster.Status.State != "suspended" && !annotationBoolVal(logger, &cluster, oauthAnnotation) && isOpenShift { + } else if cluster.Status.State != "suspended" && !isRayDashboardOAuthEnabled() && isOpenShift { logger.Info("Creating Dashboard Route") _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, createRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) if err != nil { @@ -188,7 +189,7 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } return ctrl.Result{}, nil - } else if cluster.Status.State != "suspended" && !annotationBoolVal(logger, &cluster, oauthAnnotation) && !isOpenShift { + } else if cluster.Status.State != "suspended" && !isRayDashboardOAuthEnabled() && !isOpenShift { logger.Info("Creating Dashboard Ingress") _, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, createIngressApplyConfiguration(&cluster, ingressHost), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) if err != nil { @@ -360,7 +361,7 @@ func createRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). WithSpec(routeapply.RouteSpec(). WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster))). - WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString(RegularServicePortName))). + WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString(regularServicePortName))). WithTLS(routeapply.TLSConfig(). WithTermination("edge")), ). @@ -442,7 +443,7 @@ func createIngressApplyConfiguration(cluster *rayv1.RayCluster, ingressHost stri WithService(networkingv1ac.IngressServiceBackend(). WithName(serviceNameFromCluster(cluster)). WithPort(networkingv1ac.ServiceBackendPort(). - WithName(RegularServicePortName), + WithName(regularServicePortName), ), ), ), @@ -510,5 +511,9 @@ func getClusterType(logger logr.Logger, clientset *kubernetes.Clientset, cluster } } -// No more ingress_options - Removing completely. -// What to do about ingress_domain? Needed for local_interactive? +func isRayDashboardOAuthEnabled() bool { + if configInstance.KubeRay != nil && configInstance.KubeRay.RayDashboardOAuthEnabled != nil { + return *configInstance.KubeRay.RayDashboardOAuthEnabled + } + return true +} From 321ef042de67a96f4f5ede335174859936f6071a Mon Sep 17 00:00:00 2001 From: ChristianZaccaria Date: Wed, 3 Apr 2024 19:08:00 +0100 Subject: [PATCH 3/5] Add support file for RC Controller --- pkg/controllers/raycluster_controller.go | 188 --------------------- pkg/controllers/support.go | 201 +++++++++++++++++++++++ 2 files changed, 201 insertions(+), 188 deletions(-) create mode 100644 pkg/controllers/support.go diff --git a/pkg/controllers/raycluster_controller.go b/pkg/controllers/raycluster_controller.go index 7ccb02e60..f388b5adb 100644 --- a/pkg/controllers/raycluster_controller.go +++ b/pkg/controllers/raycluster_controller.go @@ -22,29 +22,19 @@ import ( "crypto/sha1" "encoding/base64" - "fmt" - "strconv" - "strings" - - "github.com/go-logr/logr" "github.com/project-codeflare/codeflare-operator/pkg/config" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" corev1 "k8s.io/api/core/v1" - networkingv1 "k8s.io/api/networking/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" coreapply "k8s.io/client-go/applyconfigurations/core/v1" v1 "k8s.io/client-go/applyconfigurations/meta/v1" - networkingv1ac "k8s.io/client-go/applyconfigurations/networking/v1" rbacapply "k8s.io/client-go/applyconfigurations/rbac/v1" - "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -209,19 +199,6 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } -func annotationBoolVal(logger logr.Logger, cluster *rayv1.RayCluster, annotation string) bool { - val := cluster.ObjectMeta.Annotations[annotation] - boolVal, err := strconv.ParseBool(val) - if err != nil { - logger.Error(err, "Could not convert", annotation, "value to bool", val) - } - if boolVal { - return true - } else { - return false - } -} - func crbNameFromCluster(cluster *rayv1.RayCluster) string { return cluster.Name + "-" + cluster.Namespace + "-auth" // NOTE: potential naming conflicts ie {name: foo, ns: bar-baz} and {name: foo-bar, ns: baz} } @@ -352,168 +329,3 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func serviceNameFromCluster(cluster *rayv1.RayCluster) string { - return cluster.Name + "-head-svc" -} - -func createRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { - return routeapply.Route(dashboardNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). - WithSpec(routeapply.RouteSpec(). - WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster))). - WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString(regularServicePortName))). - WithTLS(routeapply.TLSConfig(). - WithTermination("edge")), - ). - WithOwnerReferences( - v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion), - ) -} - -func createRayClientRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { - ingress_domain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] - return routeapply.Route(rayClientNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). - WithSpec(routeapply.RouteSpec(). - WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace + "." + ingress_domain). - WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster)).WithWeight(100)). - WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString("client"))). - WithTLS(routeapply.TLSConfig().WithTermination("passthrough")), - ). - WithOwnerReferences( - v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion), - ) -} - -// Create an Ingress object for the RayCluster -func createRayClientIngress(cluster *rayv1.RayCluster) *networkingv1ac.IngressApplyConfiguration { - ingress_domain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] - return networkingv1ac.Ingress(rayClientNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). - WithAnnotations(map[string]string{ - "nginx.ingress.kubernetes.io/rewrite-target": "/", - "nginx.ingress.kubernetes.io/ssl-redirect": "true", - "nginx.ingress.kubernetes.io/ssl-passthrough": "true", - }). - WithOwnerReferences(v1.OwnerReference(). - WithAPIVersion(cluster.APIVersion). - WithKind(cluster.Kind). - WithName(cluster.Name). - WithUID(types.UID(cluster.UID))). - WithSpec(networkingv1ac.IngressSpec(). - WithIngressClassName("nginx"). - WithRules(networkingv1ac.IngressRule(). - WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace + "." + ingress_domain). - WithHTTP(networkingv1ac.HTTPIngressRuleValue(). - WithPaths(networkingv1ac.HTTPIngressPath(). - WithPath("/"). - WithPathType(networkingv1.PathTypeImplementationSpecific). - WithBackend(networkingv1ac.IngressBackend(). - WithService(networkingv1ac.IngressServiceBackend(). - WithName(serviceNameFromCluster(cluster)). - WithPort(networkingv1ac.ServiceBackendPort(). - WithNumber(10001), - ), - ), - ), - ), - ), - ), - ) - // Optionally, add TLS configuration here if needed -} - -// Create an Ingress object for the RayCluster -func createIngressApplyConfiguration(cluster *rayv1.RayCluster, ingressHost string) *networkingv1ac.IngressApplyConfiguration { - return networkingv1ac.Ingress(dashboardNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). - WithOwnerReferences(v1.OwnerReference(). - WithAPIVersion(cluster.APIVersion). - WithKind(cluster.Kind). - WithName(cluster.Name). - WithUID(types.UID(cluster.UID))). - WithSpec(networkingv1ac.IngressSpec(). - WithRules(networkingv1ac.IngressRule(). - WithHost(ingressHost). // kind host name or ingress_domain - WithHTTP(networkingv1ac.HTTPIngressRuleValue(). - WithPaths(networkingv1ac.HTTPIngressPath(). - WithPath("/"). - WithPathType(networkingv1.PathTypePrefix). - WithBackend(networkingv1ac.IngressBackend(). - WithService(networkingv1ac.IngressServiceBackend(). - WithName(serviceNameFromCluster(cluster)). - WithPort(networkingv1ac.ServiceBackendPort(). - WithName(regularServicePortName), - ), - ), - ), - ), - ), - ), - ) - // Optionally, add TLS configuration here if needed -} - -// isOnKindCluster checks if the current cluster is a KinD cluster. -// It searches for a node with a label commonly used by KinD clusters. -func isOnKindCluster(clientset *kubernetes.Clientset) (bool, error) { - nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ - LabelSelector: "kubernetes.io/hostname=kind-control-plane", - }) - if err != nil { - return false, err - } - // If we find one or more nodes with the label, assume it's a KinD cluster. - return len(nodes.Items) > 0, nil -} - -// getDiscoveryClient returns a discovery client for the current reconciler -func getDiscoveryClient(config *rest.Config) (*discovery.DiscoveryClient, error) { - return discovery.NewDiscoveryClientForConfig(config) -} - -// Check where we are running. We are trying to distinguish here whether -// this is vanilla kubernetes cluster or Openshift -func getClusterType(logger logr.Logger, clientset *kubernetes.Clientset, cluster *rayv1.RayCluster) (bool, string) { - // The discovery package is used to discover APIs supported by a Kubernetes API server. - ingress_domain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] - config, err := ctrl.GetConfig() - if err == nil && config != nil { - dclient, err := getDiscoveryClient(config) - if err == nil && dclient != nil { - apiGroupList, err := dclient.ServerGroups() - if err != nil { - logger.Info("Error while querying ServerGroups, assuming we're on Vanilla Kubernetes") - return false, "" - } else { - for i := 0; i < len(apiGroupList.Groups); i++ { - if strings.HasSuffix(apiGroupList.Groups[i].Name, ".openshift.io") { - logger.Info("We detected being on OpenShift!") - return true, "" - } - } - onKind, _ := isOnKindCluster(clientset) - if onKind && ingress_domain == "" { - logger.Info("We detected being on a KinD cluster!") - return false, "kind" - } else { - logger.Info("We detected being on Vanilla Kubernetes!") - return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingress_domain) - } - } - } else { - logger.Info("Cannot retrieve a DiscoveryClient, assuming we're on Vanilla Kubernetes") - return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingress_domain) - } - } else { - logger.Info("Cannot retrieve config, assuming we're on Vanilla Kubernetes") - return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingress_domain) - } -} - -func isRayDashboardOAuthEnabled() bool { - if configInstance.KubeRay != nil && configInstance.KubeRay.RayDashboardOAuthEnabled != nil { - return *configInstance.KubeRay.RayDashboardOAuthEnabled - } - return true -} diff --git a/pkg/controllers/support.go b/pkg/controllers/support.go new file mode 100644 index 000000000..23de282c4 --- /dev/null +++ b/pkg/controllers/support.go @@ -0,0 +1,201 @@ +package controllers + +import ( + "context" + "fmt" + "strconv" + "strings" + + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/go-logr/logr" + routeapply "github.com/openshift/client-go/route/applyconfigurations/route/v1" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + + networkingv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + v1 "k8s.io/client-go/applyconfigurations/meta/v1" + networkingv1ac "k8s.io/client-go/applyconfigurations/networking/v1" + "k8s.io/client-go/discovery" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +func serviceNameFromCluster(cluster *rayv1.RayCluster) string { + return cluster.Name + "-head-svc" +} + +func createRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { + return routeapply.Route(dashboardNameFromCluster(cluster), cluster.Namespace). + WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithSpec(routeapply.RouteSpec(). + WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster))). + WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString(regularServicePortName))). + WithTLS(routeapply.TLSConfig(). + WithTermination("edge")), + ). + WithOwnerReferences( + v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion), + ) +} + +func createRayClientRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { + ingress_domain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] + return routeapply.Route(rayClientNameFromCluster(cluster), cluster.Namespace). + WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithSpec(routeapply.RouteSpec(). + WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace + "." + ingress_domain). + WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster)).WithWeight(100)). + WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString("client"))). + WithTLS(routeapply.TLSConfig().WithTermination("passthrough")), + ). + WithOwnerReferences( + v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion), + ) +} + +// Create an Ingress object for the RayCluster +func createRayClientIngress(cluster *rayv1.RayCluster) *networkingv1ac.IngressApplyConfiguration { + ingress_domain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] + return networkingv1ac.Ingress(rayClientNameFromCluster(cluster), cluster.Namespace). + WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithAnnotations(map[string]string{ + "nginx.ingress.kubernetes.io/rewrite-target": "/", + "nginx.ingress.kubernetes.io/ssl-redirect": "true", + "nginx.ingress.kubernetes.io/ssl-passthrough": "true", + }). + WithOwnerReferences(v1.OwnerReference(). + WithAPIVersion(cluster.APIVersion). + WithKind(cluster.Kind). + WithName(cluster.Name). + WithUID(types.UID(cluster.UID))). + WithSpec(networkingv1ac.IngressSpec(). + WithIngressClassName("nginx"). + WithRules(networkingv1ac.IngressRule(). + WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace + "." + ingress_domain). + WithHTTP(networkingv1ac.HTTPIngressRuleValue(). + WithPaths(networkingv1ac.HTTPIngressPath(). + WithPath("/"). + WithPathType(networkingv1.PathTypeImplementationSpecific). + WithBackend(networkingv1ac.IngressBackend(). + WithService(networkingv1ac.IngressServiceBackend(). + WithName(serviceNameFromCluster(cluster)). + WithPort(networkingv1ac.ServiceBackendPort(). + WithNumber(10001), + ), + ), + ), + ), + ), + ), + ) +} + +// Create an Ingress object for the RayCluster +func createIngressApplyConfiguration(cluster *rayv1.RayCluster, ingressHost string) *networkingv1ac.IngressApplyConfiguration { + return networkingv1ac.Ingress(dashboardNameFromCluster(cluster), cluster.Namespace). + WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithOwnerReferences(v1.OwnerReference(). + WithAPIVersion(cluster.APIVersion). + WithKind(cluster.Kind). + WithName(cluster.Name). + WithUID(types.UID(cluster.UID))). + WithSpec(networkingv1ac.IngressSpec(). + WithRules(networkingv1ac.IngressRule(). + WithHost(ingressHost). // kind host name or ingress_domain + WithHTTP(networkingv1ac.HTTPIngressRuleValue(). + WithPaths(networkingv1ac.HTTPIngressPath(). + WithPath("/"). + WithPathType(networkingv1.PathTypePrefix). + WithBackend(networkingv1ac.IngressBackend(). + WithService(networkingv1ac.IngressServiceBackend(). + WithName(serviceNameFromCluster(cluster)). + WithPort(networkingv1ac.ServiceBackendPort(). + WithName(regularServicePortName), + ), + ), + ), + ), + ), + ), + ) +} + +// isOnKindCluster checks if the current cluster is a KinD cluster. +// It searches for a node with a label commonly used by KinD clusters. +func isOnKindCluster(clientset *kubernetes.Clientset) (bool, error) { + nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ + LabelSelector: "kubernetes.io/hostname=kind-control-plane", + }) + if err != nil { + return false, err + } + // If we find one or more nodes with the label, assume it's a KinD cluster. + return len(nodes.Items) > 0, nil +} + +// getDiscoveryClient returns a discovery client for the current reconciler +func getDiscoveryClient(config *rest.Config) (*discovery.DiscoveryClient, error) { + return discovery.NewDiscoveryClientForConfig(config) +} + +// Check where we are running. We are trying to distinguish here whether +// this is vanilla kubernetes cluster or Openshift +func getClusterType(logger logr.Logger, clientset *kubernetes.Clientset, cluster *rayv1.RayCluster) (bool, string) { + // The discovery package is used to discover APIs supported by a Kubernetes API server. + ingress_domain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] + config, err := ctrl.GetConfig() + if err == nil && config != nil { + dclient, err := getDiscoveryClient(config) + if err == nil && dclient != nil { + apiGroupList, err := dclient.ServerGroups() + if err != nil { + logger.Info("Error while querying ServerGroups, assuming we're on Vanilla Kubernetes") + return false, "" + } else { + for i := 0; i < len(apiGroupList.Groups); i++ { + if strings.HasSuffix(apiGroupList.Groups[i].Name, ".openshift.io") { + logger.Info("We detected being on OpenShift!") + return true, "" + } + } + onKind, _ := isOnKindCluster(clientset) + if onKind && ingress_domain == "" { + logger.Info("We detected being on a KinD cluster!") + return false, "kind" + } else { + logger.Info("We detected being on Vanilla Kubernetes!") + return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingress_domain) + } + } + } else { + logger.Info("Cannot retrieve a DiscoveryClient, assuming we're on Vanilla Kubernetes") + return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingress_domain) + } + } else { + logger.Info("Cannot retrieve config, assuming we're on Vanilla Kubernetes") + return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingress_domain) + } +} + +func isRayDashboardOAuthEnabled() bool { + if configInstance.KubeRay != nil && configInstance.KubeRay.RayDashboardOAuthEnabled != nil { + return *configInstance.KubeRay.RayDashboardOAuthEnabled + } + return true +} + +func annotationBoolVal(logger logr.Logger, cluster *rayv1.RayCluster, annotation string) bool { + val := cluster.ObjectMeta.Annotations[annotation] + boolVal, err := strconv.ParseBool(val) + if err != nil { + logger.Error(err, "Could not convert", annotation, "value to bool", val) + } + if boolVal { + return true + } else { + return false + } +} From bd3f2f71f27764a77e407cd04ee6091e0cc70b81 Mon Sep 17 00:00:00 2001 From: ChristianZaccaria Date: Wed, 3 Apr 2024 19:19:23 +0100 Subject: [PATCH 4/5] Disable OAuth for e2e tests --- .github/workflows/e2e_tests.yaml | 1 + pkg/controllers/raycluster_controller.go | 8 ++++---- pkg/controllers/support.go | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml index 0fc1ec88b..2e8fdbc1f 100644 --- a/.github/workflows/e2e_tests.yaml +++ b/.github/workflows/e2e_tests.yaml @@ -60,6 +60,7 @@ jobs: run: | echo Deploying CodeFlare operator IMG="${REGISTRY_ADDRESS}"/codeflare-operator + sed -i 's/RayDashboardOAuthEnabled: pointer.Bool(true)/RayDashboardOAuthEnabled: pointer.Bool(false)/' main.go make image-push -e IMG="${IMG}" make deploy -e IMG="${IMG}" -e ENV="e2e" kubectl wait --timeout=120s --for=condition=Available=true deployment -n openshift-operators codeflare-operator-manager diff --git a/pkg/controllers/raycluster_controller.go b/pkg/controllers/raycluster_controller.go index f388b5adb..798e6fc48 100644 --- a/pkg/controllers/raycluster_controller.go +++ b/pkg/controllers/raycluster_controller.go @@ -22,7 +22,6 @@ import ( "crypto/sha1" "encoding/base64" - "github.com/project-codeflare/codeflare-operator/pkg/config" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" corev1 "k8s.io/api/core/v1" @@ -42,6 +41,8 @@ import ( routev1 "github.com/openshift/api/route/v1" routeapply "github.com/openshift/client-go/route/applyconfigurations/route/v1" routev1client "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1" + + "github.com/project-codeflare/codeflare-operator/pkg/config" ) // RayClusterReconciler reconciles a RayCluster object @@ -64,8 +65,8 @@ const ( ) var ( - deletePolicy = metav1.DeletePropagationForeground - deleteOptions = client.DeleteOptions{PropagationPolicy: &deletePolicy} + deletePolicy = metav1.DeletePropagationForeground + deleteOptions = client.DeleteOptions{PropagationPolicy: &deletePolicy} configInstance *config.CodeFlareOperatorConfiguration ) @@ -328,4 +329,3 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&rayv1.RayCluster{}). Complete(r) } - diff --git a/pkg/controllers/support.go b/pkg/controllers/support.go index 23de282c4..bdc490aa2 100644 --- a/pkg/controllers/support.go +++ b/pkg/controllers/support.go @@ -6,10 +6,7 @@ import ( "strconv" "strings" - ctrl "sigs.k8s.io/controller-runtime" - "github.com/go-logr/logr" - routeapply "github.com/openshift/client-go/route/applyconfigurations/route/v1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" networkingv1 "k8s.io/api/networking/v1" @@ -21,6 +18,9 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + + routeapply "github.com/openshift/client-go/route/applyconfigurations/route/v1" ) func serviceNameFromCluster(cluster *rayv1.RayCluster) string { From a62b141cb3e0cb00e68134ee5850606330b0672b Mon Sep 17 00:00:00 2001 From: ChristianZaccaria Date: Thu, 4 Apr 2024 12:22:47 +0100 Subject: [PATCH 5/5] Add OAuth config to reconciler struct --- go.mod | 2 +- main.go | 2 +- pkg/controllers/raycluster_controller.go | 42 ++++----- pkg/controllers/support.go | 112 ++++++++++------------- 4 files changed, 69 insertions(+), 89 deletions(-) diff --git a/go.mod b/go.mod index fae85661b..93fa2c9fc 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/project-codeflare/codeflare-operator go 1.20 require ( - github.com/go-logr/logr v1.2.4 github.com/onsi/ginkgo/v2 v2.11.0 github.com/onsi/gomega v1.27.10 github.com/openshift/api v0.0.0-20230213134911-7ba313770556 @@ -43,6 +42,7 @@ require ( github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.2.4 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect diff --git a/main.go b/main.go index 30122002b..658b55e90 100644 --- a/main.go +++ b/main.go @@ -189,7 +189,7 @@ func main() { v, err := HasAPIResourceForGVK(kubeClient.DiscoveryClient, rayv1.GroupVersion.WithKind("RayCluster")) if v { - rayClusterController := controllers.RayClusterReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()} + rayClusterController := controllers.RayClusterReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Config: cfg} exitOnError(rayClusterController.SetupWithManager(mgr), "Error setting up RayCluster controller") } else if err != nil { exitOnError(err, "Could not determine if RayCluster CR present on cluster.") diff --git a/pkg/controllers/raycluster_controller.go b/pkg/controllers/raycluster_controller.go index 798e6fc48..23104c2ab 100644 --- a/pkg/controllers/raycluster_controller.go +++ b/pkg/controllers/raycluster_controller.go @@ -52,6 +52,7 @@ type RayClusterReconciler struct { routeClient *routev1client.RouteV1Client Scheme *runtime.Scheme CookieSalt string + Config *config.CodeFlareOperatorConfiguration } const ( @@ -60,14 +61,13 @@ const ( oAuthFinalizer = "ray.openshift.ai/oauth-finalizer" oAuthServicePort = 443 oAuthServicePortName = "oauth-proxy" - regularServicePortName = "dashboard" + ingressServicePortName = "dashboard" logRequeueing = "requeueing" ) var ( - deletePolicy = metav1.DeletePropagationForeground - deleteOptions = client.DeleteOptions{PropagationPolicy: &deletePolicy} - configInstance *config.CodeFlareOperatorConfiguration + deletePolicy = metav1.DeletePropagationForeground + deleteOptions = client.DeleteOptions{PropagationPolicy: &deletePolicy} ) // +kubebuilder:rbac:groups=ray.io,resources=rayclusters,verbs=get;list;watch;create;update;patch;delete @@ -101,9 +101,9 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, client.IgnoreNotFound(err) } - isLocalInteractive := annotationBoolVal(logger, &cluster, "sdk.codeflare.dev/local_interactive") - isOpenShift, ingressHost := getClusterType(logger, r.kubeClient, &cluster) - ingressDomain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] + isLocalInteractive := annotationBoolVal(ctx, &cluster, "sdk.codeflare.dev/local_interactive", false) + ingressDomain := "" // FIX - CFO will retrieve it. + isOpenShift, ingressHost := getClusterType(ctx, r.kubeClient, &cluster, ingressDomain) if cluster.ObjectMeta.DeletionTimestamp.IsZero() { if !controllerutil.ContainsFinalizer(&cluster, oAuthFinalizer) { @@ -138,63 +138,63 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - if cluster.Status.State != "suspended" && isRayDashboardOAuthEnabled() && isOpenShift { + if cluster.Status.State != "suspended" && r.isRayDashboardOAuthEnabled() && isOpenShift { logger.Info("Creating OAuth Objects") _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredClusterRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) if err != nil { logger.Error(err, "Failed to update OAuth Route") + return ctrl.Result{RequeueAfter: requeueTime}, err } _, err = r.kubeClient.CoreV1().Secrets(cluster.Namespace).Apply(ctx, desiredOAuthSecret(&cluster, r), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) if err != nil { logger.Error(err, "Failed to create OAuth Secret") + return ctrl.Result{RequeueAfter: requeueTime}, err } _, err = r.kubeClient.CoreV1().Services(cluster.Namespace).Apply(ctx, desiredOAuthService(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) if err != nil { logger.Error(err, "Failed to update OAuth Service") + return ctrl.Result{RequeueAfter: requeueTime}, err } _, err = r.kubeClient.CoreV1().ServiceAccounts(cluster.Namespace).Apply(ctx, desiredServiceAccount(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) if err != nil { logger.Error(err, "Failed to update OAuth ServiceAccount") + return ctrl.Result{RequeueAfter: requeueTime}, err } _, err = r.kubeClient.RbacV1().ClusterRoleBindings().Apply(ctx, desiredOAuthClusterRoleBinding(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) if err != nil { logger.Error(err, "Failed to update OAuth ClusterRoleBinding") + return ctrl.Result{RequeueAfter: requeueTime}, err } - } else if cluster.Status.State != "suspended" && !isRayDashboardOAuthEnabled() && isOpenShift { - logger.Info("Creating Dashboard Route") - _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, createRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) - if err != nil { - logger.Error(err, "Failed to update Dashboard Route") - } - if isLocalInteractive && ingressDomain != "" { + if isLocalInteractive { logger.Info("Creating RayClient Route") - _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, createRayClientRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredRayClientRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) if err != nil { logger.Error(err, "Failed to update RayClient Route") + return ctrl.Result{RequeueAfter: requeueTime}, err } } - return ctrl.Result{}, nil - } else if cluster.Status.State != "suspended" && !isRayDashboardOAuthEnabled() && !isOpenShift { + } else if cluster.Status.State != "suspended" && !r.isRayDashboardOAuthEnabled() && !isOpenShift { logger.Info("Creating Dashboard Ingress") - _, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, createIngressApplyConfiguration(&cluster, ingressHost), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + _, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, desiredClusterIngress(&cluster, ingressHost), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) if err != nil { // This log is info level since errors are not fatal and are expected logger.Info("WARN: Failed to update Dashboard Ingress", "error", err.Error(), logRequeueing, true) + return ctrl.Result{RequeueAfter: requeueTime}, err } if isLocalInteractive && ingressDomain != "" { logger.Info("Creating RayClient Ingress") - _, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, createRayClientIngress(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + _, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, desiredRayClientIngress(&cluster, ingressDomain), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) if err != nil { logger.Error(err, "Failed to update RayClient Ingress") + return ctrl.Result{RequeueAfter: requeueTime}, err } } - return ctrl.Result{}, nil } return ctrl.Result{}, nil diff --git a/pkg/controllers/support.go b/pkg/controllers/support.go index bdc490aa2..348cd03de 100644 --- a/pkg/controllers/support.go +++ b/pkg/controllers/support.go @@ -6,7 +6,6 @@ import ( "strconv" "strings" - "github.com/go-logr/logr" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" networkingv1 "k8s.io/api/networking/v1" @@ -27,26 +26,11 @@ func serviceNameFromCluster(cluster *rayv1.RayCluster) string { return cluster.Name + "-head-svc" } -func createRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { - return routeapply.Route(dashboardNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). - WithSpec(routeapply.RouteSpec(). - WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster))). - WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString(regularServicePortName))). - WithTLS(routeapply.TLSConfig(). - WithTermination("edge")), - ). - WithOwnerReferences( - v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion), - ) -} - -func createRayClientRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { - ingress_domain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] +func desiredRayClientRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { return routeapply.Route(rayClientNameFromCluster(cluster), cluster.Namespace). WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). WithSpec(routeapply.RouteSpec(). - WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace + "." + ingress_domain). + WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace). WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster)).WithWeight(100)). WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString("client"))). WithTLS(routeapply.TLSConfig().WithTermination("passthrough")), @@ -57,8 +41,7 @@ func createRayClientRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfi } // Create an Ingress object for the RayCluster -func createRayClientIngress(cluster *rayv1.RayCluster) *networkingv1ac.IngressApplyConfiguration { - ingress_domain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] +func desiredRayClientIngress(cluster *rayv1.RayCluster, ingressDomain string) *networkingv1ac.IngressApplyConfiguration { return networkingv1ac.Ingress(rayClientNameFromCluster(cluster), cluster.Namespace). WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). WithAnnotations(map[string]string{ @@ -74,7 +57,7 @@ func createRayClientIngress(cluster *rayv1.RayCluster) *networkingv1ac.IngressAp WithSpec(networkingv1ac.IngressSpec(). WithIngressClassName("nginx"). WithRules(networkingv1ac.IngressRule(). - WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace + "." + ingress_domain). + WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace + "." + ingressDomain). WithHTTP(networkingv1ac.HTTPIngressRuleValue(). WithPaths(networkingv1ac.HTTPIngressPath(). WithPath("/"). @@ -94,7 +77,7 @@ func createRayClientIngress(cluster *rayv1.RayCluster) *networkingv1ac.IngressAp } // Create an Ingress object for the RayCluster -func createIngressApplyConfiguration(cluster *rayv1.RayCluster, ingressHost string) *networkingv1ac.IngressApplyConfiguration { +func desiredClusterIngress(cluster *rayv1.RayCluster, ingressHost string) *networkingv1ac.IngressApplyConfiguration { return networkingv1ac.Ingress(dashboardNameFromCluster(cluster), cluster.Namespace). WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). WithOwnerReferences(v1.OwnerReference(). @@ -104,7 +87,7 @@ func createIngressApplyConfiguration(cluster *rayv1.RayCluster, ingressHost stri WithUID(types.UID(cluster.UID))). WithSpec(networkingv1ac.IngressSpec(). WithRules(networkingv1ac.IngressRule(). - WithHost(ingressHost). // kind host name or ingress_domain + WithHost(ingressHost). // KinD hostname or ingressDomain WithHTTP(networkingv1ac.HTTPIngressRuleValue(). WithPaths(networkingv1ac.HTTPIngressPath(). WithPath("/"). @@ -113,7 +96,7 @@ func createIngressApplyConfiguration(cluster *rayv1.RayCluster, ingressHost stri WithService(networkingv1ac.IngressServiceBackend(). WithName(serviceNameFromCluster(cluster)). WithPort(networkingv1ac.ServiceBackendPort(). - WithName(regularServicePortName), + WithName(ingressServicePortName), ), ), ), @@ -143,59 +126,56 @@ func getDiscoveryClient(config *rest.Config) (*discovery.DiscoveryClient, error) // Check where we are running. We are trying to distinguish here whether // this is vanilla kubernetes cluster or Openshift -func getClusterType(logger logr.Logger, clientset *kubernetes.Clientset, cluster *rayv1.RayCluster) (bool, string) { +func getClusterType(ctx context.Context, clientset *kubernetes.Clientset, cluster *rayv1.RayCluster, ingressDomain string) (bool, string) { // The discovery package is used to discover APIs supported by a Kubernetes API server. - ingress_domain := cluster.ObjectMeta.Annotations["sdk.codeflare.dev/ingress_domain"] + logger := ctrl.LoggerFrom(ctx) config, err := ctrl.GetConfig() - if err == nil && config != nil { - dclient, err := getDiscoveryClient(config) - if err == nil && dclient != nil { - apiGroupList, err := dclient.ServerGroups() - if err != nil { - logger.Info("Error while querying ServerGroups, assuming we're on Vanilla Kubernetes") - return false, "" - } else { - for i := 0; i < len(apiGroupList.Groups); i++ { - if strings.HasSuffix(apiGroupList.Groups[i].Name, ".openshift.io") { - logger.Info("We detected being on OpenShift!") - return true, "" - } - } - onKind, _ := isOnKindCluster(clientset) - if onKind && ingress_domain == "" { - logger.Info("We detected being on a KinD cluster!") - return false, "kind" - } else { - logger.Info("We detected being on Vanilla Kubernetes!") - return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingress_domain) - } - } - } else { - logger.Info("Cannot retrieve a DiscoveryClient, assuming we're on Vanilla Kubernetes") - return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingress_domain) - } - } else { + if err != nil && config == nil { logger.Info("Cannot retrieve config, assuming we're on Vanilla Kubernetes") - return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingress_domain) + return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingressDomain) } + dclient, err := getDiscoveryClient(config) + if err != nil && dclient == nil { + logger.Info("Cannot retrieve a DiscoveryClient, assuming we're on Vanilla Kubernetes") + return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingressDomain) + } + apiGroupList, err := dclient.ServerGroups() + if err != nil { + logger.Info("Error while querying ServerGroups, assuming we're on Vanilla Kubernetes") + return false, "" + } + for i := 0; i < len(apiGroupList.Groups); i++ { + if strings.HasSuffix(apiGroupList.Groups[i].Name, ".openshift.io") { + logger.Info("We detected being on OpenShift!") + return true, "" + } + } + onKind, _ := isOnKindCluster(clientset) + if onKind && ingressDomain == "" { + logger.Info("We detected being on a KinD cluster!") + return false, "kind" + } + logger.Info("We detected being on Vanilla Kubernetes!") + return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingressDomain) } -func isRayDashboardOAuthEnabled() bool { - if configInstance.KubeRay != nil && configInstance.KubeRay.RayDashboardOAuthEnabled != nil { - return *configInstance.KubeRay.RayDashboardOAuthEnabled +func (r *RayClusterReconciler) isRayDashboardOAuthEnabled() bool { + if r.Config != nil && r.Config.KubeRay != nil && r.Config.KubeRay.RayDashboardOAuthEnabled != nil { + return *r.Config.KubeRay.RayDashboardOAuthEnabled } return true } -func annotationBoolVal(logger logr.Logger, cluster *rayv1.RayCluster, annotation string) bool { - val := cluster.ObjectMeta.Annotations[annotation] +func annotationBoolVal(ctx context.Context, cluster *rayv1.RayCluster, annotation string, defaultValue bool) bool { + logger := ctrl.LoggerFrom(ctx) + val, exists := cluster.ObjectMeta.Annotations[annotation] + if !exists || val == "" { + return defaultValue + } boolVal, err := strconv.ParseBool(val) if err != nil { - logger.Error(err, "Could not convert", annotation, "value to bool", val) - } - if boolVal { - return true - } else { - return false + logger.Error(err, "Could not convert annotation value to bool", "annotation", annotation, "value", val) + return defaultValue } + return boolVal }