Skip to content

Commit 5df7150

Browse files
committed
add: hypershift functionality
1 parent e591d18 commit 5df7150

File tree

5 files changed

+246
-102
lines changed

5 files changed

+246
-102
lines changed

controllers/appwrapper_controller.go

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ package controllers
1919
import (
2020
"context"
2121
"fmt"
22-
"strings"
23-
"time"
24-
22+
ocmsdk "github.com/openshift-online/ocm-sdk-go"
2523
"github.com/project-codeflare/instascale/pkg/config"
2624
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
2725
corev1 "k8s.io/api/core/v1"
@@ -30,6 +28,8 @@ import (
3028
"k8s.io/apimachinery/pkg/runtime"
3129
"k8s.io/client-go/kubernetes"
3230
"k8s.io/klog"
31+
"strings"
32+
"time"
3333

3434
"k8s.io/apimachinery/pkg/labels"
3535
ctrl "sigs.k8s.io/controller-runtime"
@@ -38,15 +38,19 @@ import (
3838
"sigs.k8s.io/controller-runtime/pkg/log"
3939
)
4040

41+
type MachineType string
42+
4143
// AppWrapperReconciler reconciles a AppWrapper object
4244
type AppWrapperReconciler struct {
4345
client.Client
44-
Scheme *runtime.Scheme
45-
Config config.InstaScaleConfiguration
46-
kubeClient *kubernetes.Clientset
47-
ocmClusterID string
48-
ocmToken string
49-
useMachineSets bool
46+
Scheme *runtime.Scheme
47+
Config config.InstaScaleConfiguration
48+
kubeClient *kubernetes.Clientset
49+
ocmClusterID string
50+
ocmToken string
51+
ocmConnection *ocmsdk.Connection
52+
MachineType MachineType
53+
machineCheck bool
5054
}
5155

5256
var (
@@ -55,9 +59,12 @@ var (
5559
)
5660

5761
const (
58-
namespaceToList = "openshift-machine-api"
59-
minResyncPeriod = 10 * time.Minute
60-
finalizerName = "instascale.codeflare.dev/finalizer"
62+
namespaceToList = "openshift-machine-api"
63+
minResyncPeriod = 10 * time.Minute
64+
finalizerName = "instascale.codeflare.dev/finalizer"
65+
MachineTypeMachineSet MachineType = "MachineSet"
66+
MachineTypeMachinePool MachineType = "MachinePool"
67+
MachineTypeNodePool MachineType = "NodePool"
6168
)
6269

6370
// +kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers,verbs=get;list;watch;create;update;patch;delete
@@ -81,15 +88,25 @@ const (
8188
// For more details, check Reconcile and its Result here:
8289
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
8390
func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
84-
8591
_ = log.FromContext(ctx)
86-
// todo: Move the getOCMClusterID call out of reconcile loop.
92+
93+
// todo: Move the "machineCheck" locic out of reconcile loop.
8794
// Only reason we are calling it here is that the client is not able to make
8895
// calls until it is started, so SetupWithManager is not working.
89-
if !r.useMachineSets && r.ocmClusterID == "" {
90-
if err := r.getOCMClusterID(ctx); err != nil {
91-
return ctrl.Result{Requeue: true, RequeueAfter: timeFiveSeconds}, err
96+
if r.machineCheck == false {
97+
if r.MachineType != MachineTypeMachineSet && r.ocmClusterID == "" {
98+
if err := r.getOCMClusterID(); err != nil {
99+
return ctrl.Result{}, err
100+
}
101+
}
102+
hypershiftEnabled, err := r.checkHypershiftEnabled(ctx)
103+
if err != nil {
104+
return ctrl.Result{}, fmt.Errorf("error checking if hypershift is enabled: %w", err)
92105
}
106+
if hypershiftEnabled {
107+
r.MachineType = MachineTypeNodePool
108+
}
109+
r.machineCheck = true
93110
}
94111
var appwrapper arbv1.AppWrapper
95112

@@ -124,8 +141,14 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
124141

125142
demandPerInstanceType := r.discoverInstanceTypes(&appwrapper)
126143
if ocmSecretRef := r.Config.OCMSecretRef; ocmSecretRef != nil {
127-
return r.scaleMachinePool(ctx, &appwrapper, demandPerInstanceType)
144+
switch r.MachineType {
145+
case MachineTypeNodePool:
146+
return r.scaleNodePool(ctx, &appwrapper, demandPerInstanceType)
147+
case MachineTypeMachinePool:
148+
return r.scaleMachinePool(ctx, &appwrapper, demandPerInstanceType)
149+
}
128150
} else {
151+
// use MachineSets
129152
switch strings.ToLower(r.Config.MachineSetsStrategy) {
130153
case "reuse":
131154
return r.reconcileReuseMachineSet(ctx, &appwrapper, demandPerInstanceType)
@@ -142,7 +165,8 @@ func (r *AppWrapperReconciler) finalizeScalingDownMachines(ctx context.Context,
142165
} else {
143166
deletionMessage = "deleted"
144167
}
145-
if r.useMachineSets {
168+
switch r.MachineType {
169+
case MachineTypeMachineSet:
146170
switch strings.ToLower(r.Config.MachineSetsStrategy) {
147171
case "reuse":
148172
matchedAw := r.findExactMatch(ctx, appwrapper)
@@ -163,7 +187,14 @@ func (r *AppWrapperReconciler) finalizeScalingDownMachines(ctx context.Context,
163187
return err
164188
}
165189
}
166-
} else {
190+
191+
case MachineTypeNodePool:
192+
klog.Infof("Appwrapper %s scale-down node pool: %s ", deletionMessage, appwrapper.Name)
193+
if _, err := r.deleteNodePool(ctx, appwrapper); err != nil {
194+
return err
195+
}
196+
197+
case MachineTypeMachinePool:
167198
klog.Infof("Appwrapper %s scale-down machine pool: %s ", deletionMessage, appwrapper.Name)
168199
if _, err := r.deleteMachinePool(ctx, appwrapper); err != nil {
169200
return err
@@ -174,7 +205,6 @@ func (r *AppWrapperReconciler) finalizeScalingDownMachines(ctx context.Context,
174205

175206
// SetupWithManager sets up the controller with the Manager.
176207
func (r *AppWrapperReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
177-
178208
restConfig := mgr.GetConfig()
179209

180210
var err error
@@ -184,21 +214,16 @@ func (r *AppWrapperReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Ma
184214
}
185215

186216
maxScaleNodesAllowed = int(r.Config.MaxScaleoutAllowed)
187-
r.useMachineSets = true
217+
r.MachineType = MachineTypeMachineSet // default to MachineSet
188218
if ocmSecretRef := r.Config.OCMSecretRef; ocmSecretRef != nil {
189-
r.useMachineSets = false
190219
if ocmSecret, err := r.getOCMSecret(ctx, ocmSecretRef); err != nil {
191220
return fmt.Errorf("error reading OCM Secret from ref %q: %w", ocmSecretRef, err)
192221
} else if token := ocmSecret.Data["token"]; len(token) > 0 {
193222
r.ocmToken = string(token)
223+
r.MachineType = MachineTypeMachinePool
194224
} else {
195225
return fmt.Errorf("token is missing from OCM Secret %q", ocmSecretRef)
196226
}
197-
if ok, err := r.machinePoolExists(); err != nil {
198-
return err
199-
} else if ok {
200-
klog.Info("Using machine pools for cluster auto-scaling")
201-
}
202227
}
203228

204229
return ctrl.NewControllerManagedBy(mgr).

controllers/machinepools.go

Lines changed: 1 addition & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -6,43 +6,13 @@ import (
66
"os"
77
"strings"
88

9-
ocmsdk "github.com/openshift-online/ocm-sdk-go"
109
cmv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1"
11-
configv1 "github.com/openshift/api/config/v1"
1210
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
1311

14-
"k8s.io/apimachinery/pkg/types"
1512
"k8s.io/klog"
1613
ctrl "sigs.k8s.io/controller-runtime"
1714
)
1815

19-
func (r *AppWrapperReconciler) createOCMConnection() (*ocmsdk.Connection, error) {
20-
logger, err := ocmsdk.NewGoLoggerBuilder().
21-
Debug(false).
22-
Build()
23-
if err != nil {
24-
return nil, fmt.Errorf("can't build logger: %v", err)
25-
}
26-
27-
connection, err := ocmsdk.NewConnectionBuilder().
28-
Logger(logger).
29-
Tokens(r.ocmToken).
30-
Build()
31-
if err != nil {
32-
return nil, fmt.Errorf("can't build connection: %v", err)
33-
}
34-
35-
return connection, nil
36-
}
37-
38-
func hasAwLabel(machinePool *cmv1.MachinePool, aw *arbv1.AppWrapper) bool {
39-
value, ok := machinePool.Labels()[aw.Name]
40-
if ok && value == aw.Name {
41-
return true
42-
}
43-
return false
44-
}
45-
4616
func (r *AppWrapperReconciler) scaleMachinePool(ctx context.Context, aw *arbv1.AppWrapper, demandPerInstanceType map[string]int) (ctrl.Result, error) {
4717
connection, err := r.createOCMConnection()
4818
if err != nil {
@@ -62,7 +32,7 @@ func (r *AppWrapperReconciler) scaleMachinePool(ctx context.Context, aw *arbv1.A
6232

6333
numberOfMachines := 0
6434
response.Items().Each(func(machinePool *cmv1.MachinePool) bool {
65-
if machinePool.InstanceType() == userRequestedInstanceType && hasAwLabel(machinePool, aw) {
35+
if machinePool.InstanceType() == userRequestedInstanceType && hasAwLabel(machinePool.Labels(), aw) {
6636
numberOfMachines = machinePool.Replicas()
6737
return false
6838
}
@@ -115,46 +85,3 @@ func (r *AppWrapperReconciler) deleteMachinePool(ctx context.Context, aw *arbv1.
11585
})
11686
return ctrl.Result{Requeue: false}, nil
11787
}
118-
119-
func (r *AppWrapperReconciler) machinePoolExists() (bool, error) {
120-
connection, err := r.createOCMConnection()
121-
if err != nil {
122-
return false, fmt.Errorf("error creating OCM connection: %w", err)
123-
}
124-
defer connection.Close()
125-
126-
machinePools := connection.ClustersMgmt().V1().Clusters().Cluster(r.ocmClusterID).MachinePools()
127-
return machinePools != nil, nil
128-
}
129-
130-
// getOCMClusterID determines the internal clusterID to be used for OCM API calls
131-
func (r *AppWrapperReconciler) getOCMClusterID(ctx context.Context) error {
132-
cv := &configv1.ClusterVersion{}
133-
err := r.Get(ctx, types.NamespacedName{Name: "version"}, cv)
134-
if err != nil {
135-
return fmt.Errorf("can't get clusterversion: %v", err)
136-
}
137-
138-
internalClusterID := string(cv.Spec.ClusterID)
139-
140-
connection, err := r.createOCMConnection()
141-
if err != nil {
142-
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
143-
}
144-
defer connection.Close()
145-
146-
// Get the client for the resource that manages the collection of clusters:
147-
collection := connection.ClustersMgmt().V1().Clusters()
148-
149-
response, err := collection.List().Search(fmt.Sprintf("external_id = '%s'", internalClusterID)).Size(1).Page(1).SendContext(ctx)
150-
if err != nil {
151-
klog.Errorf(`Error getting cluster id: %v`, err)
152-
}
153-
154-
response.Items().Each(func(cluster *cmv1.Cluster) bool {
155-
r.ocmClusterID = cluster.ID()
156-
fmt.Printf("%s - %s - %s\n", cluster.ID(), cluster.Name(), cluster.State())
157-
return true
158-
})
159-
return nil
160-
}

controllers/nodepools.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package controllers
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"strings"
8+
9+
cmv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1"
10+
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
11+
12+
"k8s.io/klog"
13+
ctrl "sigs.k8s.io/controller-runtime"
14+
)
15+
16+
func (r *AppWrapperReconciler) scaleNodePool(ctx context.Context, aw *arbv1.AppWrapper, demandPerInstanceType map[string]int) (ctrl.Result, error) {
17+
connection, err := r.createOCMConnection()
18+
if err != nil {
19+
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
20+
return ctrl.Result{}, err
21+
}
22+
defer connection.Close()
23+
for userRequestedInstanceType := range demandPerInstanceType {
24+
replicas := demandPerInstanceType[userRequestedInstanceType]
25+
26+
clusterNodePools := connection.ClustersMgmt().V1().Clusters().Cluster(r.ocmClusterID).NodePools()
27+
28+
response, err := clusterNodePools.List().SendContext(ctx)
29+
if err != nil {
30+
return ctrl.Result{}, err
31+
}
32+
33+
numberOfMachines := 0
34+
response.Items().Each(func(nodePool *cmv1.NodePool) bool {
35+
36+
if nodePool.AWSNodePool().InstanceType() == userRequestedInstanceType && hasAwLabel(nodePool.Labels(), aw) {
37+
numberOfMachines = nodePool.Replicas()
38+
return false
39+
}
40+
return true
41+
})
42+
43+
if numberOfMachines != replicas {
44+
m := make(map[string]string)
45+
m[aw.Name] = aw.Name
46+
klog.Infof("The instanceRequired array: %v", userRequestedInstanceType)
47+
48+
nodePoolID := strings.ReplaceAll(aw.Name+"-"+userRequestedInstanceType, ".", "-")
49+
50+
createNodePool, err := cmv1.NewNodePool().AWSNodePool(cmv1.NewAWSNodePool().InstanceType(userRequestedInstanceType)).ID(nodePoolID).Replicas(replicas).Labels(m).Build()
51+
if err != nil {
52+
klog.Errorf(`Error building NodePool: %v`, err)
53+
}
54+
klog.Infof("Built NodePool with instance type %v and name %v", userRequestedInstanceType, createNodePool.ID())
55+
response, err := clusterNodePools.Add().Body(createNodePool).SendContext(ctx)
56+
if err != nil {
57+
klog.Errorf(`Error creating NodePool: %v`, err)
58+
}
59+
klog.Infof("Created NodePool: %v", response)
60+
}
61+
}
62+
return ctrl.Result{Requeue: false}, nil
63+
}
64+
65+
func (r *AppWrapperReconciler) deleteNodePool(ctx context.Context, aw *arbv1.AppWrapper) (ctrl.Result, error) {
66+
connection, err := r.createOCMConnection()
67+
if err != nil {
68+
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
69+
return ctrl.Result{}, err
70+
}
71+
defer connection.Close()
72+
73+
nodePoolsConnection := connection.ClustersMgmt().V1().Clusters().Cluster(r.ocmClusterID).NodePools().List()
74+
75+
nodePoolsListResponse, _ := nodePoolsConnection.Send()
76+
nodePoolsList := nodePoolsListResponse.Items()
77+
nodePoolsList.Range(func(index int, item *cmv1.NodePool) bool {
78+
id, _ := item.GetID()
79+
if strings.Contains(id, aw.Name) {
80+
targetNodePool, err := connection.ClustersMgmt().V1().Clusters().Cluster(r.ocmClusterID).NodePools().NodePool(id).Delete().SendContext(ctx)
81+
if err != nil {
82+
klog.Infof("Error deleting target nodepool %v", targetNodePool)
83+
}
84+
klog.Infof("Successfully Scaled down target nodepool %v", id)
85+
}
86+
return true
87+
})
88+
return ctrl.Result{Requeue: false}, nil
89+
}
90+
91+
func (r *AppWrapperReconciler) checkHypershiftEnabled(ctx context.Context) (bool, error) {
92+
connection, err := r.createOCMConnection()
93+
if err != nil {
94+
return false, fmt.Errorf("error creating OCM connection: %w", err)
95+
}
96+
defer connection.Close()
97+
98+
clusterResource := connection.ClustersMgmt().V1().Clusters().Cluster(r.ocmClusterID)
99+
100+
response, err := clusterResource.Get().SendContext(ctx)
101+
if err != nil {
102+
return false, fmt.Errorf("error fetching cluster details: %w", err)
103+
}
104+
105+
body := response.Body()
106+
if body == nil {
107+
return false, fmt.Errorf("empty response body")
108+
}
109+
110+
hypershiftEnabled := false
111+
if body.Hypershift() != nil {
112+
hypershiftEnabled = body.Hypershift().Enabled()
113+
}
114+
115+
fmt.Printf("Hypershift enabled status: %v\n", hypershiftEnabled)
116+
return hypershiftEnabled, nil
117+
}

0 commit comments

Comments
 (0)