Skip to content

Refactor creation logic of ingress/routes into RayCluster Controller #493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/e2e_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ jobs:
run: |
echo Deploying CodeFlare operator
IMG="${REGISTRY_ADDRESS}"/codeflare-operator
sed -i 's/RayDashboardOAuthEnabled: pointer.Bool(true)/RayDashboardOAuthEnabled: pointer.Bool(false)/' main.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be done in the e2e ConfigMap instead.

make image-push -e IMG="${IMG}"
make deploy -e IMG="${IMG}" -e ENV="e2e"
kubectl wait --timeout=120s --for=condition=Available=true deployment -n openshift-operators codeflare-operator-manager
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ func main() {
}

v, err := HasAPIResourceForGVK(kubeClient.DiscoveryClient, rayv1.GroupVersion.WithKind("RayCluster"))
if v && *cfg.KubeRay.RayDashboardOAuthEnabled {
rayClusterController := controllers.RayClusterReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}
if v {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's rename that v to ok :)

rayClusterController := controllers.RayClusterReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Config: cfg}
exitOnError(rayClusterController.SetupWithManager(mgr), "Error setting up RayCluster controller")
} else if err != nil {
exitOnError(err, "Could not determine if RayCluster CR present on cluster.")
Expand Down
102 changes: 74 additions & 28 deletions pkg/controllers/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
routev1 "github.com/openshift/api/route/v1"
routeapply "github.com/openshift/client-go/route/applyconfigurations/route/v1"
routev1client "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1"

"github.com/project-codeflare/codeflare-operator/pkg/config"
)

// RayClusterReconciler reconciles a RayCluster object
Expand All @@ -50,15 +52,17 @@ type RayClusterReconciler struct {
routeClient *routev1client.RouteV1Client
Scheme *runtime.Scheme
CookieSalt string
Config *config.CodeFlareOperatorConfiguration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe only the RayCluster controller configuration could be enough here?

}

const (
requeueTime = 10
controllerName = "codeflare-raycluster-controller"
oAuthFinalizer = "ray.openshift.ai/oauth-finalizer"
oAuthServicePort = 443
oAuthServicePortName = "oauth-proxy"
logRequeueing = "requeueing"
requeueTime = 10
controllerName = "codeflare-raycluster-controller"
oAuthFinalizer = "ray.openshift.ai/oauth-finalizer"
oAuthServicePort = 443
oAuthServicePortName = "oauth-proxy"
ingressServicePortName = "dashboard"
logRequeueing = "requeueing"
)

var (
Expand Down Expand Up @@ -97,6 +101,10 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

isLocalInteractive := annotationBoolVal(ctx, &cluster, "sdk.codeflare.dev/local_interactive", false)
ingressDomain := "" // FIX - CFO will retrieve it.
isOpenShift, ingressHost := getClusterType(ctx, r.kubeClient, &cluster, ingressDomain)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking the cluster type should only be done once when the operator starts. Calling the Discovery API for each reconciliation is also not really acceptable.


if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
if !controllerutil.ContainsFinalizer(&cluster, oAuthFinalizer) {
logger.Info("Add a finalizer", "finalizer", oAuthFinalizer)
Expand Down Expand Up @@ -130,29 +138,63 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

_, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredClusterRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update OAuth Route")
}
if cluster.Status.State != "suspended" && r.isRayDashboardOAuthEnabled() && isOpenShift {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible the cluster transitions from running to suspended state. Should the resources be removed in that case?

logger.Info("Creating OAuth Objects")
_, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredClusterRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update OAuth Route")
return ctrl.Result{RequeueAfter: requeueTime}, err
}

_, err = r.kubeClient.CoreV1().Secrets(cluster.Namespace).Apply(ctx, desiredOAuthSecret(&cluster, r), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to create OAuth Secret")
}
_, err = r.kubeClient.CoreV1().Secrets(cluster.Namespace).Apply(ctx, desiredOAuthSecret(&cluster, r), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to create OAuth Secret")
return ctrl.Result{RequeueAfter: requeueTime}, err
}

_, err = r.kubeClient.CoreV1().Services(cluster.Namespace).Apply(ctx, desiredOAuthService(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update OAuth Service")
}
_, err = r.kubeClient.CoreV1().Services(cluster.Namespace).Apply(ctx, desiredOAuthService(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update OAuth Service")
return ctrl.Result{RequeueAfter: requeueTime}, err
}

_, err = r.kubeClient.CoreV1().ServiceAccounts(cluster.Namespace).Apply(ctx, desiredServiceAccount(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update OAuth ServiceAccount")
}
_, err = r.kubeClient.CoreV1().ServiceAccounts(cluster.Namespace).Apply(ctx, desiredServiceAccount(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update OAuth ServiceAccount")
return ctrl.Result{RequeueAfter: requeueTime}, err
}

_, err = r.kubeClient.RbacV1().ClusterRoleBindings().Apply(ctx, desiredOAuthClusterRoleBinding(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update OAuth ClusterRoleBinding")
_, err = r.kubeClient.RbacV1().ClusterRoleBindings().Apply(ctx, desiredOAuthClusterRoleBinding(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update OAuth ClusterRoleBinding")
return ctrl.Result{RequeueAfter: requeueTime}, err
}

if isLocalInteractive {
logger.Info("Creating RayClient Route")
_, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredRayClientRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update RayClient Route")
return ctrl.Result{RequeueAfter: requeueTime}, err
}
}

} else if cluster.Status.State != "suspended" && !r.isRayDashboardOAuthEnabled() && !isOpenShift {
logger.Info("Creating Dashboard Ingress")
_, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, desiredClusterIngress(&cluster, ingressHost), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
// This log is info level since errors are not fatal and are expected
logger.Info("WARN: Failed to update Dashboard Ingress", "error", err.Error(), logRequeueing, true)
return ctrl.Result{RequeueAfter: requeueTime}, err
}
if isLocalInteractive && ingressDomain != "" {
logger.Info("Creating RayClient Ingress")
_, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, desiredRayClientIngress(&cluster, ingressDomain), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update RayClient Ingress")
return ctrl.Result{RequeueAfter: requeueTime}, err
}
}
}

return ctrl.Result{}, nil
Expand Down Expand Up @@ -193,19 +235,23 @@ func desiredServiceAccount(cluster *rayv1.RayCluster) *coreapply.ServiceAccountA
WithAnnotations(map[string]string{
"serviceaccounts.openshift.io/oauth-redirectreference.first": "" +
`{"kind":"OAuthRedirectReference","apiVersion":"v1",` +
`"reference":{"kind":"Route","name":"` + routeNameFromCluster(cluster) + `"}}`,
`"reference":{"kind":"Route","name":"` + dashboardNameFromCluster(cluster) + `"}}`,
}).
WithOwnerReferences(
v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion),
)
}

func routeNameFromCluster(cluster *rayv1.RayCluster) string {
func dashboardNameFromCluster(cluster *rayv1.RayCluster) string {
return "ray-dashboard-" + cluster.Name
}

func rayClientNameFromCluster(cluster *rayv1.RayCluster) string {
return "rayclient-" + cluster.Name
}

func desiredClusterRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration {
return routeapply.Route(routeNameFromCluster(cluster), cluster.Namespace).
return routeapply.Route(dashboardNameFromCluster(cluster), cluster.Namespace).
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
WithSpec(routeapply.RouteSpec().
WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(oauthServiceNameFromCluster(cluster))).
Expand Down
181 changes: 181 additions & 0 deletions pkg/controllers/support.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package controllers

import (
"context"
"fmt"
"strconv"
"strings"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"

networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
v1 "k8s.io/client-go/applyconfigurations/meta/v1"
networkingv1ac "k8s.io/client-go/applyconfigurations/networking/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"

routeapply "github.com/openshift/client-go/route/applyconfigurations/route/v1"
)

func serviceNameFromCluster(cluster *rayv1.RayCluster) string {
return cluster.Name + "-head-svc"
}

func desiredRayClientRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration {
return routeapply.Route(rayClientNameFromCluster(cluster), cluster.Namespace).
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
WithSpec(routeapply.RouteSpec().
WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace).
WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster)).WithWeight(100)).
WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString("client"))).
WithTLS(routeapply.TLSConfig().WithTermination("passthrough")),
).
WithOwnerReferences(
v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion),
)
}

// Create an Ingress object for the RayCluster
func desiredRayClientIngress(cluster *rayv1.RayCluster, ingressDomain string) *networkingv1ac.IngressApplyConfiguration {
return networkingv1ac.Ingress(rayClientNameFromCluster(cluster), cluster.Namespace).
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
WithAnnotations(map[string]string{
"nginx.ingress.kubernetes.io/rewrite-target": "/",
"nginx.ingress.kubernetes.io/ssl-redirect": "true",
"nginx.ingress.kubernetes.io/ssl-passthrough": "true",
}).
WithOwnerReferences(v1.OwnerReference().
WithAPIVersion(cluster.APIVersion).
WithKind(cluster.Kind).
WithName(cluster.Name).
WithUID(types.UID(cluster.UID))).
WithSpec(networkingv1ac.IngressSpec().
WithIngressClassName("nginx").
WithRules(networkingv1ac.IngressRule().
WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace + "." + ingressDomain).
WithHTTP(networkingv1ac.HTTPIngressRuleValue().
WithPaths(networkingv1ac.HTTPIngressPath().
WithPath("/").
WithPathType(networkingv1.PathTypeImplementationSpecific).
WithBackend(networkingv1ac.IngressBackend().
WithService(networkingv1ac.IngressServiceBackend().
WithName(serviceNameFromCluster(cluster)).
WithPort(networkingv1ac.ServiceBackendPort().
WithNumber(10001),
),
),
),
),
),
),
)
}

// Create an Ingress object for the RayCluster
func desiredClusterIngress(cluster *rayv1.RayCluster, ingressHost string) *networkingv1ac.IngressApplyConfiguration {
return networkingv1ac.Ingress(dashboardNameFromCluster(cluster), cluster.Namespace).
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
WithOwnerReferences(v1.OwnerReference().
WithAPIVersion(cluster.APIVersion).
WithKind(cluster.Kind).
WithName(cluster.Name).
WithUID(types.UID(cluster.UID))).
WithSpec(networkingv1ac.IngressSpec().
WithRules(networkingv1ac.IngressRule().
WithHost(ingressHost). // KinD hostname or ingressDomain
WithHTTP(networkingv1ac.HTTPIngressRuleValue().
WithPaths(networkingv1ac.HTTPIngressPath().
WithPath("/").
WithPathType(networkingv1.PathTypePrefix).
WithBackend(networkingv1ac.IngressBackend().
WithService(networkingv1ac.IngressServiceBackend().
WithName(serviceNameFromCluster(cluster)).
WithPort(networkingv1ac.ServiceBackendPort().
WithName(ingressServicePortName),
),
),
),
),
),
),
)
}

// isOnKindCluster checks if the current cluster is a KinD cluster.
// It searches for a node with a label commonly used by KinD clusters.
func isOnKindCluster(clientset *kubernetes.Clientset) (bool, error) {
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
LabelSelector: "kubernetes.io/hostname=kind-control-plane",
})
if err != nil {
return false, err
}
// If we find one or more nodes with the label, assume it's a KinD cluster.
return len(nodes.Items) > 0, nil
}

// getDiscoveryClient returns a discovery client for the current reconciler
func getDiscoveryClient(config *rest.Config) (*discovery.DiscoveryClient, error) {
return discovery.NewDiscoveryClientForConfig(config)
}

// Check where we are running. We are trying to distinguish here whether
// this is vanilla kubernetes cluster or Openshift
func getClusterType(ctx context.Context, clientset *kubernetes.Clientset, cluster *rayv1.RayCluster, ingressDomain string) (bool, string) {
// The discovery package is used to discover APIs supported by a Kubernetes API server.
logger := ctrl.LoggerFrom(ctx)
config, err := ctrl.GetConfig()
if err != nil && config == nil {
logger.Info("Cannot retrieve config, assuming we're on Vanilla Kubernetes")
return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingressDomain)
}
dclient, err := getDiscoveryClient(config)
if err != nil && dclient == nil {
logger.Info("Cannot retrieve a DiscoveryClient, assuming we're on Vanilla Kubernetes")
return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingressDomain)
}
apiGroupList, err := dclient.ServerGroups()
if err != nil {
logger.Info("Error while querying ServerGroups, assuming we're on Vanilla Kubernetes")
return false, ""
}
for i := 0; i < len(apiGroupList.Groups); i++ {
if strings.HasSuffix(apiGroupList.Groups[i].Name, ".openshift.io") {
logger.Info("We detected being on OpenShift!")
return true, ""
}
}
onKind, _ := isOnKindCluster(clientset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That really feels like testing concerns leaking in application code. Why is it needed to explicitly check KinD?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we don't explicitly need to check for KinD, anything that is not OpenShift could suit. The only reason that we check for KinD is for our own testing purposes. If it's true, then the ingress Host will use "kind". We could make it more generic by supplying the ingress_domain to the e2e tests, then there is no need for checking for KinD explicitly. Should I change or leave as is for now... WDYT?

if onKind && ingressDomain == "" {
logger.Info("We detected being on a KinD cluster!")
return false, "kind"
}
logger.Info("We detected being on Vanilla Kubernetes!")
return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingressDomain)
}

func (r *RayClusterReconciler) isRayDashboardOAuthEnabled() bool {
if r.Config != nil && r.Config.KubeRay != nil && r.Config.KubeRay.RayDashboardOAuthEnabled != nil {
return *r.Config.KubeRay.RayDashboardOAuthEnabled
}
return true
}

func annotationBoolVal(ctx context.Context, cluster *rayv1.RayCluster, annotation string, defaultValue bool) bool {
logger := ctrl.LoggerFrom(ctx)
val, exists := cluster.ObjectMeta.Annotations[annotation]
if !exists || val == "" {
return defaultValue
}
boolVal, err := strconv.ParseBool(val)
if err != nil {
logger.Error(err, "Could not convert annotation value to bool", "annotation", annotation, "value", val)
return defaultValue
}
return boolVal
}