Skip to content

Watch for RayCluster CRD then start RayCluster controller #546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ rules:
- list
- update
- watch
- apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
verbs:
- get
- list
- watch
- apiGroups:
- authentication.k8s.io
resources:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ require (
github.com/project-codeflare/codeflare-common v0.0.0-20240207083912-d7a229270a0a
github.com/ray-project/kuberay/ray-operator v1.1.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
k8s.io/api v0.28.4
k8s.io/apiextensions-apiserver v0.28.4
k8s.io/apimachinery v0.28.4
k8s.io/client-go v11.0.0+incompatible
k8s.io/component-base v0.28.4
Expand Down Expand Up @@ -78,7 +80,6 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sys v0.18.0 // indirect
Expand All @@ -92,7 +93,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.28.4 // indirect
k8s.io/kube-openapi v0.0.0-20230901164831-6c774f458599 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect
Expand Down
100 changes: 68 additions & 32 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,24 @@ import (
dsciv1 "github.com/opendatahub-io/opendatahub-operator/v2/apis/dscinitialization/v1"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/slices"

corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
retrywatch "k8s.io/client-go/tools/watch"
configv1alpha1 "k8s.io/component-base/config/v1alpha1"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -169,33 +174,81 @@ func main() {
exitOnError(err, cfg.KubeRay.IngressDomain)
}

go setupControllers(mgr, kubeClient, cfg, isOpenShift(ctx, kubeClient.DiscoveryClient), certsReady)

setupLog.Info("setting up health endpoints")
exitOnError(setupProbeEndpoints(mgr, cfg, certsReady), "unable to set up health check")

setupLog.Info("setting up RayCluster controller")
go waitForRayClusterAPIandSetupController(ctx, mgr, cfg, isOpenShift(ctx, kubeClient.DiscoveryClient), certsReady)

setupLog.Info("starting manager")
exitOnError(mgr.Start(ctx), "error running manager")
}

func setupControllers(mgr ctrl.Manager, dc discovery.DiscoveryInterface, cfg *config.CodeFlareOperatorConfiguration, isOpenShift bool, certsReady chan struct{}) {
func setupRayClusterController(mgr ctrl.Manager, cfg *config.CodeFlareOperatorConfiguration, isOpenShift bool, certsReady chan struct{}) error {
setupLog.Info("Waiting for certificate generation to complete")
<-certsReady
setupLog.Info("Certs ready")

exitOnError(controllers.SetupRayClusterWebhookWithManager(mgr, cfg.KubeRay), "error setting up RayCluster webhook")
err := controllers.SetupRayClusterWebhookWithManager(mgr, cfg.KubeRay)
if err != nil {
return err
}

rayClusterController := controllers.RayClusterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Config: cfg.KubeRay,
IsOpenShift: isOpenShift,
}
return rayClusterController.SetupWithManager(mgr)
}

// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch

func waitForRayClusterAPIandSetupController(ctx context.Context, mgr ctrl.Manager, cfg *config.CodeFlareOperatorConfiguration, isOpenShift bool, certsReady chan struct{}) {
crdClient, err := apiextensionsclientset.NewForConfig(mgr.GetConfig())
exitOnError(err, "unable to create CRD client")

crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
exitOnError(err, "unable to list CRDs")

if slices.ContainsFunc(crdList.Items, func(crd apiextensionsv1.CustomResourceDefinition) bool {
return crd.Name == "rayclusters.ray.io"
}) {
exitOnError(setupRayClusterController(mgr, cfg, isOpenShift, certsReady), "unable to setup RayCluster controller")
}

retryWatcher, err := retrywatch.NewRetryWatcher(crdList.ResourceVersion, &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return crdClient.ApiextensionsV1().CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{})
},
})
exitOnError(err, "unable to create retry watcher")

ok, err := hasAPIResourceForGVK(dc, rayv1.GroupVersion.WithKind("RayCluster"))
if ok {
rayClusterController := controllers.RayClusterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Config: cfg.KubeRay,
IsOpenShift: isOpenShift,
defer retryWatcher.Stop()
for {
select {
case <-ctx.Done():
return
case event := <-retryWatcher.ResultChan():
switch event.Type {
case watch.Error:
exitOnError(apierrors.FromObject(event.Object), "error watching for RayCluster API")

case watch.Added, watch.Modified:
if crd := event.Object.(*apiextensionsv1.CustomResourceDefinition); crd.Name == "rayclusters.ray.io" &&
slices.ContainsFunc(crd.Status.Conditions, func(condition apiextensionsv1.CustomResourceDefinitionCondition) bool {
return condition.Type == apiextensionsv1.Established && condition.Status == apiextensionsv1.ConditionTrue
}) {
setupLog.Info("RayCluster API installed, setting up controller")
exitOnError(setupRayClusterController(mgr, cfg, isOpenShift, certsReady), "unable to setup RayCluster controller")
return
}
}
}
exitOnError(rayClusterController.SetupWithManager(mgr), "Error setting up RayCluster controller")
} else if err != nil {
exitOnError(err, "Could not determine if RayCluster CR present on cluster.")
}
}

Expand Down Expand Up @@ -289,23 +342,6 @@ func createConfigMap(ctx context.Context, client kubernetes.Interface, ns, name
return err
}

func hasAPIResourceForGVK(dc discovery.DiscoveryInterface, gvk schema.GroupVersionKind) (bool, error) {
gv, kind := gvk.ToAPIVersionAndKind()
if resources, err := dc.ServerResourcesForGroupVersion(gv); err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
} else {
for _, res := range resources.APIResources {
if res.Kind == kind {
return true, nil
}
}
}
return false, nil
}

func namespaceOrDie() string {
// This way assumes you've set the NAMESPACE environment variable either manually, when running
// the operator standalone, or using the downward API, when running the operator in-cluster.
Expand Down