@@ -21,6 +21,7 @@ import (
21
21
"fmt"
22
22
"net/http"
23
23
"net/url"
24
+ "strings"
24
25
"testing"
25
26
26
27
. "github.com/onsi/gomega"
@@ -40,14 +41,18 @@ import (
40
41
// directly managed by Kueue, and asserts successful completion of the training job.
41
42
42
43
func TestMnistRayJobRayClusterCpu (t * testing.T ) {
43
- runMnistRayJobRayCluster (t , "cpu" , 0 )
44
+ runMnistRayJobRayCluster (t , "cpu" , 0 , "nvidia.com/gpu" , GetRayImage () )
44
45
}
45
46
46
- func TestMnistRayJobRayClusterGpu (t * testing.T ) {
47
- runMnistRayJobRayCluster (t , "gpu" , 1 )
47
+ func TestMnistRayJobRayClusterCudaGpu (t * testing.T ) {
48
+ runMnistRayJobRayCluster (t , "gpu" , 1 , "nvidia.com/gpu" , GetRayImage () )
48
49
}
49
50
50
- func runMnistRayJobRayCluster (t * testing.T , accelerator string , numberOfGpus int ) {
51
+ func TestMnistRayJobRayClusterROCmGpu (t * testing.T ) {
52
+ runMnistRayJobRayCluster (t , "gpu" , 1 , "amd.com/gpu" , GetRayROCmImage ())
53
+ }
54
+
55
+ func runMnistRayJobRayCluster (t * testing.T , accelerator string , numberOfGpus int , gpuResourceName string , rayImage string ) {
51
56
test := With (t )
52
57
53
58
// Create a static namespace to ensure a consistent Ray Dashboard hostname entry in /etc/hosts before executing the test.
@@ -58,11 +63,11 @@ func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int
58
63
defer func () {
59
64
_ = test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Delete (test .Ctx (), resourceFlavor .Name , metav1.DeleteOptions {})
60
65
}()
61
- clusterQueue := createClusterQueue (test , resourceFlavor , numberOfGpus )
66
+ clusterQueue := createClusterQueue (test , resourceFlavor , numberOfGpus , gpuResourceName )
62
67
defer func () {
63
68
_ = test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Delete (test .Ctx (), clusterQueue .Name , metav1.DeleteOptions {})
64
69
}()
65
- CreateKueueLocalQueue (test , namespace .Name , clusterQueue .Name , AsDefaultQueue )
70
+ localQueue := CreateKueueLocalQueue (test , namespace .Name , clusterQueue .Name , AsDefaultQueue )
66
71
67
72
// Create MNIST training script
68
73
mnist := constructMNISTConfigMap (test , namespace )
@@ -71,7 +76,7 @@ func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int
71
76
test .T ().Logf ("Created ConfigMap %s/%s successfully" , mnist .Namespace , mnist .Name )
72
77
73
78
// Create RayCluster and assign it to the localqueue
74
- rayCluster := constructRayCluster (test , namespace , mnist , numberOfGpus )
79
+ rayCluster := constructRayCluster (test , namespace , localQueue . Name , mnist , numberOfGpus , gpuResourceName , rayImage )
75
80
rayCluster , err = test .Client ().Ray ().RayV1 ().RayClusters (namespace .Name ).Create (test .Ctx (), rayCluster , metav1.CreateOptions {})
76
81
test .Expect (err ).NotTo (HaveOccurred ())
77
82
test .T ().Logf ("Created RayCluster %s/%s successfully" , rayCluster .Namespace , rayCluster .Name )
@@ -81,7 +86,7 @@ func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int
81
86
Should (WithTransform (RayClusterState , Equal (rayv1 .Ready )))
82
87
83
88
// Create RayJob
84
- rayJob := constructRayJob (test , namespace , rayCluster , accelerator , numberOfGpus )
89
+ rayJob := constructRayJob (test , namespace , rayCluster , accelerator , numberOfGpus , gpuResourceName , rayImage )
85
90
rayJob , err = test .Client ().Ray ().RayV1 ().RayJobs (namespace .Name ).Create (test .Ctx (), rayJob , metav1.CreateOptions {})
86
91
test .Expect (err ).NotTo (HaveOccurred ())
87
92
test .T ().Logf ("Created RayJob %s/%s successfully" , rayJob .Namespace , rayJob .Name )
@@ -110,15 +115,19 @@ func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int
110
115
}
111
116
112
117
func TestMnistRayJobRayClusterAppWrapperCpu (t * testing.T ) {
113
- runMnistRayJobRayClusterAppWrapper (t , "cpu" , 0 )
118
+ runMnistRayJobRayClusterAppWrapper (t , "cpu" , 0 , "nvidia.com/gpu" , GetRayImage () )
114
119
}
115
120
116
- func TestMnistRayJobRayClusterAppWrapperGpu (t * testing.T ) {
117
- runMnistRayJobRayClusterAppWrapper (t , "gpu" , 1 )
121
+ func TestMnistRayJobRayClusterAppWrapperCudaGpu (t * testing.T ) {
122
+ runMnistRayJobRayClusterAppWrapper (t , "gpu" , 1 , "nvidia.com/gpu" , GetRayImage ())
123
+ }
124
+
125
+ func TestMnistRayJobRayClusterAppWrapperROCmGpu (t * testing.T ) {
126
+ runMnistRayJobRayClusterAppWrapper (t , "gpu" , 1 , "amd.com/gpu" , GetRayROCmImage ())
118
127
}
119
128
120
129
// Same as TestMNISTRayJobRayCluster, except the RayCluster is wrapped in an AppWrapper
121
- func runMnistRayJobRayClusterAppWrapper (t * testing.T , accelerator string , numberOfGpus int ) {
130
+ func runMnistRayJobRayClusterAppWrapper (t * testing.T , accelerator string , numberOfGpus int , gpuResourceName string , rayImage string ) {
122
131
test := With (t )
123
132
124
133
// Create a static namespace to ensure a consistent Ray Dashboard hostname entry in /etc/hosts before executing the test.
@@ -129,7 +138,7 @@ func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, number
129
138
defer func () {
130
139
_ = test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Delete (test .Ctx (), resourceFlavor .Name , metav1.DeleteOptions {})
131
140
}()
132
- clusterQueue := createClusterQueue (test , resourceFlavor , numberOfGpus )
141
+ clusterQueue := createClusterQueue (test , resourceFlavor , numberOfGpus , gpuResourceName )
133
142
defer func () {
134
143
_ = test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Delete (test .Ctx (), clusterQueue .Name , metav1.DeleteOptions {})
135
144
}()
@@ -142,7 +151,7 @@ func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, number
142
151
test .T ().Logf ("Created ConfigMap %s/%s successfully" , mnist .Namespace , mnist .Name )
143
152
144
153
// Create RayCluster, wrap in AppWrapper and assign to localqueue
145
- rayCluster := constructRayCluster (test , namespace , mnist , numberOfGpus )
154
+ rayCluster := constructRayCluster (test , namespace , localQueue . Name , mnist , numberOfGpus , gpuResourceName , rayImage )
146
155
raw := Raw (test , rayCluster )
147
156
raw = RemoveCreationTimestamp (test , raw )
148
157
@@ -183,7 +192,7 @@ func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, number
183
192
Should (WithTransform (RayClusterState , Equal (rayv1 .Ready )))
184
193
185
194
// Create RayJob
186
- rayJob := constructRayJob (test , namespace , rayCluster , accelerator , numberOfGpus )
195
+ rayJob := constructRayJob (test , namespace , rayCluster , accelerator , numberOfGpus , gpuResourceName , rayImage )
187
196
rayJob , err = test .Client ().Ray ().RayV1 ().RayJobs (namespace .Name ).Create (test .Ctx (), rayJob , metav1.CreateOptions {})
188
197
test .Expect (err ).NotTo (HaveOccurred ())
189
198
test .T ().Logf ("Created RayJob %s/%s successfully" , rayJob .Namespace , rayJob .Name )
@@ -223,11 +232,11 @@ func TestRayClusterImagePullSecret(t *testing.T) {
223
232
defer func () {
224
233
_ = test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Delete (test .Ctx (), resourceFlavor .Name , metav1.DeleteOptions {})
225
234
}()
226
- clusterQueue := createClusterQueue (test , resourceFlavor , 0 )
235
+ clusterQueue := createClusterQueue (test , resourceFlavor , 0 , "nvidia.com/gpu" )
227
236
defer func () {
228
237
_ = test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Delete (test .Ctx (), clusterQueue .Name , metav1.DeleteOptions {})
229
238
}()
230
- CreateKueueLocalQueue (test , namespace .Name , clusterQueue .Name , AsDefaultQueue )
239
+ localQueue := CreateKueueLocalQueue (test , namespace .Name , clusterQueue .Name , AsDefaultQueue )
231
240
232
241
// Create MNIST training script
233
242
mnist := constructMNISTConfigMap (test , namespace )
@@ -236,7 +245,7 @@ func TestRayClusterImagePullSecret(t *testing.T) {
236
245
test .T ().Logf ("Created ConfigMap %s/%s successfully" , mnist .Namespace , mnist .Name )
237
246
238
247
// Create RayCluster with imagePullSecret and assign it to the localqueue
239
- rayCluster := constructRayCluster (test , namespace , mnist , 0 )
248
+ rayCluster := constructRayCluster (test , namespace , localQueue . Name , mnist , 0 , "nvidia.com/gpu" , GetRayImage () )
240
249
rayCluster .Spec .HeadGroupSpec .Template .Spec .ImagePullSecrets = []corev1.LocalObjectReference {{Name : "custom-pull-secret" }}
241
250
rayCluster , err = test .Client ().Ray ().RayV1 ().RayClusters (namespace .Name ).Create (test .Ctx (), rayCluster , metav1.CreateOptions {})
242
251
test .Expect (err ).NotTo (HaveOccurred ())
@@ -266,7 +275,7 @@ func constructMNISTConfigMap(test Test, namespace *corev1.Namespace) *corev1.Con
266
275
}
267
276
}
268
277
269
- func constructRayCluster (_ Test , namespace * corev1.Namespace , mnist * corev1.ConfigMap , numberOfGpus int ) * rayv1.RayCluster {
278
+ func constructRayCluster (_ Test , namespace * corev1.Namespace , localQueueName string , mnist * corev1.ConfigMap , numberOfGpus int , gpuResourceName string , rayImage string ) * rayv1.RayCluster {
270
279
return & rayv1.RayCluster {
271
280
TypeMeta : metav1.TypeMeta {
272
281
APIVersion : rayv1 .GroupVersion .String (),
@@ -275,6 +284,9 @@ func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.Conf
275
284
ObjectMeta : metav1.ObjectMeta {
276
285
Name : "raycluster" ,
277
286
Namespace : namespace .Name ,
287
+ Labels : map [string ]string {
288
+ "kueue.x-k8s.io/queue-name" : localQueueName ,
289
+ },
278
290
},
279
291
Spec : rayv1.RayClusterSpec {
280
292
RayVersion : GetRayVersion (),
@@ -287,7 +299,7 @@ func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.Conf
287
299
Containers : []corev1.Container {
288
300
{
289
301
Name : "ray-head" ,
290
- Image : GetRayImage () ,
302
+ Image : rayImage ,
291
303
Ports : []corev1.ContainerPort {
292
304
{
293
305
ContainerPort : 6379 ,
@@ -335,14 +347,14 @@ func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.Conf
335
347
Spec : corev1.PodSpec {
336
348
Tolerations : []corev1.Toleration {
337
349
{
338
- Key : "nvidia.com/gpu" ,
350
+ Key : gpuResourceName ,
339
351
Operator : corev1 .TolerationOpExists ,
340
352
},
341
353
},
342
354
Containers : []corev1.Container {
343
355
{
344
356
Name : "ray-worker" ,
345
- Image : GetRayImage () ,
357
+ Image : rayImage ,
346
358
Lifecycle : & corev1.Lifecycle {
347
359
PreStop : & corev1.LifecycleHandler {
348
360
Exec : & corev1.ExecAction {
@@ -352,14 +364,14 @@ func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.Conf
352
364
},
353
365
Resources : corev1.ResourceRequirements {
354
366
Requests : corev1.ResourceList {
355
- corev1 .ResourceCPU : resource .MustParse ("250m" ),
356
- corev1 .ResourceMemory : resource .MustParse ("1G" ),
357
- "nvidia.com/gpu" : resource .MustParse (fmt .Sprint (numberOfGpus )),
367
+ corev1 .ResourceCPU : resource .MustParse ("250m" ),
368
+ corev1 .ResourceMemory : resource .MustParse ("1G" ),
369
+ corev1 . ResourceName ( gpuResourceName ): resource .MustParse (fmt .Sprint (numberOfGpus )),
358
370
},
359
371
Limits : corev1.ResourceList {
360
- corev1 .ResourceCPU : resource .MustParse ("2" ),
361
- corev1 .ResourceMemory : resource .MustParse ("4G" ),
362
- "nvidia.com/gpu" : resource .MustParse (fmt .Sprint (numberOfGpus )),
372
+ corev1 .ResourceCPU : resource .MustParse ("2" ),
373
+ corev1 .ResourceMemory : resource .MustParse ("4G" ),
374
+ corev1 . ResourceName ( gpuResourceName ): resource .MustParse (fmt .Sprint (numberOfGpus )),
363
375
},
364
376
},
365
377
VolumeMounts : []corev1.VolumeMount {
@@ -390,7 +402,22 @@ func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.Conf
390
402
}
391
403
}
392
404
393
- func constructRayJob (_ Test , namespace * corev1.Namespace , rayCluster * rayv1.RayCluster , accelerator string , numberOfGpus int ) * rayv1.RayJob {
405
+ func constructRayJob (_ Test , namespace * corev1.Namespace , rayCluster * rayv1.RayCluster , accelerator string , numberOfGpus int , gpuResourceName string , rayImage string ) * rayv1.RayJob {
406
+ pipPackages := []string {
407
+ "pytorch_lightning==2.4.0" ,
408
+ "torchmetrics==1.6.0" ,
409
+ "torchvision==0.19.1" ,
410
+ }
411
+
412
+ // Append AMD-specific packages
413
+ if gpuResourceName == "amd.com/gpu" {
414
+ pipPackages = append (pipPackages ,
415
+ "--extra-index-url https://download.pytorch.org/whl/rocm6.1" ,
416
+ "torch==2.4.1+rocm6.1" ,
417
+ )
418
+ }
419
+
420
+ // Construct RayJob with the final pip list
394
421
return & rayv1.RayJob {
395
422
TypeMeta : metav1.TypeMeta {
396
423
APIVersion : rayv1 .GroupVersion .String (),
@@ -402,17 +429,15 @@ func constructRayJob(_ Test, namespace *corev1.Namespace, rayCluster *rayv1.RayC
402
429
},
403
430
Spec : rayv1.RayJobSpec {
404
431
Entrypoint : "python /home/ray/jobs/mnist.py" ,
405
- RuntimeEnvYAML : `
406
- pip:
407
- - pytorch_lightning==2.4.0
408
- - torchmetrics==1.6.0
409
- - torchvision==0.20.1
410
- env_vars:
411
- MNIST_DATASET_URL: "` + GetMnistDatasetURL () + `"
412
- PIP_INDEX_URL: "` + GetPipIndexURL () + `"
413
- PIP_TRUSTED_HOST: "` + GetPipTrustedHost () + `"
414
- ACCELERATOR: "` + accelerator + `"
415
- ` ,
432
+ RuntimeEnvYAML : fmt .Sprintf (`
433
+ pip:
434
+ - %s
435
+ env_vars:
436
+ MNIST_DATASET_URL: "%s"
437
+ PIP_INDEX_URL: "%s"
438
+ PIP_TRUSTED_HOST: "%s"
439
+ ACCELERATOR: "%s"
440
+ ` , strings .Join (pipPackages , "\n - " ), GetMnistDatasetURL (), GetPipIndexURL (), GetPipTrustedHost (), accelerator ),
416
441
ClusterSelector : map [string ]string {
417
442
RayJobDefaultClusterSelectorKey : rayCluster .Name ,
418
443
},
@@ -422,7 +447,7 @@ func constructRayJob(_ Test, namespace *corev1.Namespace, rayCluster *rayv1.RayC
422
447
RestartPolicy : corev1 .RestartPolicyNever ,
423
448
Containers : []corev1.Container {
424
449
{
425
- Image : GetRayImage () ,
450
+ Image : rayImage ,
426
451
Name : "rayjob-submitter-pod" ,
427
452
},
428
453
},
@@ -477,12 +502,12 @@ func getRayDashboardURL(test Test, namespace, rayClusterName string) string {
477
502
}
478
503
479
504
// Create ClusterQueue
480
- func createClusterQueue (test Test , resourceFlavor * v1beta1.ResourceFlavor , numberOfGpus int ) * v1beta1.ClusterQueue {
505
+ func createClusterQueue (test Test , resourceFlavor * v1beta1.ResourceFlavor , numberOfGpus int , gpuResourceName string ) * v1beta1.ClusterQueue {
481
506
cqSpec := v1beta1.ClusterQueueSpec {
482
507
NamespaceSelector : & metav1.LabelSelector {},
483
508
ResourceGroups : []v1beta1.ResourceGroup {
484
509
{
485
- CoveredResources : []corev1.ResourceName {corev1 .ResourceName ("cpu" ), corev1 .ResourceName ("memory" ), corev1 .ResourceName ("nvidia.com/gpu" )},
510
+ CoveredResources : []corev1.ResourceName {corev1 .ResourceName ("cpu" ), corev1 .ResourceName ("memory" ), corev1 .ResourceName (gpuResourceName )},
486
511
Flavors : []v1beta1.FlavorQuotas {
487
512
{
488
513
Name : v1beta1 .ResourceFlavorReference (resourceFlavor .Name ),
@@ -496,7 +521,7 @@ func createClusterQueue(test Test, resourceFlavor *v1beta1.ResourceFlavor, numbe
496
521
NominalQuota : resource .MustParse ("12Gi" ),
497
522
},
498
523
{
499
- Name : corev1 .ResourceName ("nvidia.com/gpu" ),
524
+ Name : corev1 .ResourceName (gpuResourceName ),
500
525
NominalQuota : resource .MustParse (fmt .Sprint (numberOfGpus )),
501
526
},
502
527
},
0 commit comments