Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 165 additions & 15 deletions cmd/gpu_plugin/rm/gpu_plugin_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ package rm

import (
"context"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/json"
"io"
"math/big"
"net"
"net/http"
"os"
"sort"
"strconv"
Expand All @@ -35,6 +43,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/utils/strings/slices"
)

const (
Expand All @@ -47,6 +56,13 @@ const (
grpcAddress = "unix:///var/lib/kubelet/pod-resources/kubelet.sock"
grpcBufferSize = 4 * 1024 * 1024
grpcTimeout = 5 * time.Second

kubeletAPITimeout = 5 * time.Second
kubeletAPIMaxRetries = 5
kubeletHTTPSCertPath = "/var/lib/kubelet/pki/kubelet.crt"
// This is detected incorrectly as credentials
//nolint:gosec
serviceAccountTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
)

// Errors.
Expand Down Expand Up @@ -100,12 +116,14 @@ type resourceManager struct {
prGetClientFunc getClientFunc
assignments map[string]podAssignmentDetails // pod name -> assignment details
nodeName string
hostIP string
skipID string
fullResourceName string
retryTimeout time.Duration
cleanupInterval time.Duration
mutex sync.RWMutex // for devTree updates during scan
cleanupMutex sync.RWMutex // for assignment details during cleanup
useKubelet bool
}

// NewDeviceInfo creates a new DeviceInfo.
Expand Down Expand Up @@ -135,30 +153,50 @@ func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error

rm := resourceManager{
nodeName: os.Getenv("NODE_NAME"),
hostIP: os.Getenv("HOST_IP"),
clientset: clientset,
skipID: skipID,
fullResourceName: fullResourceName,
prGetClientFunc: podresources.GetV1Client,
assignments: make(map[string]podAssignmentDetails),
retryTimeout: 1 * time.Second,
cleanupInterval: 2 * time.Minute,
cleanupInterval: 20 * time.Minute,
useKubelet: true,
}

klog.Info("GPU device plugin resource manager enabled")

// Try listing Pods once to detect if Kubelet API works
_, err = rm.listPodsFromKubelet()

if err != nil {
klog.V(2).Info("Not using Kubelet API")

rm.useKubelet = false
} else {
klog.V(2).Info("Using Kubelet API")
}

go func() {
ticker := time.NewTicker(rm.cleanupInterval)
getRandDuration := func() time.Duration {
cleanupIntervalSeconds := int(rm.cleanupInterval.Seconds())

n, _ := rand.Int(rand.Reader, big.NewInt(int64(cleanupIntervalSeconds)))

return rm.cleanupInterval/2 + time.Duration(n.Int64())*time.Second
}

ticker := time.NewTicker(getRandDuration())

for range ticker.C {
klog.V(4).Info("Running cleanup")

ticker.Reset(getRandDuration())

// Gather both running and pending pods. It might happen that
// cleanup is triggered between GetPreferredAllocation and Allocate
// and it would remove the assignment data for the soon-to-be allocated pod
running := rm.listPodsOnNodeWithState(string(v1.PodRunning))
for podName, podItem := range rm.listPodsOnNodeWithState(string(v1.PodPending)) {
running[podName] = podItem
}
running := rm.listPodsOnNodeWithStates([]string{string(v1.PodRunning), string(v1.PodPending)})

func() {
rm.cleanupMutex.Lock()
Expand Down Expand Up @@ -189,29 +227,141 @@ func getPodResourceKey(res *podresourcesv1.PodResources) string {
return res.Namespace + "&" + res.Name
}

func (rm *resourceManager) listPodsOnNodeWithState(state string) map[string]*v1.Pod {
pods := make(map[string]*v1.Pod)

selector, err := fields.ParseSelector("spec.nodeName=" + rm.nodeName +
",status.phase=" + state)
func (rm *resourceManager) listPodsFromAPIServer() (*v1.PodList, error) {
selector, err := fields.ParseSelector("spec.nodeName=" + rm.nodeName)

if err != nil {
return pods
return &v1.PodList{}, err
}

klog.V(4).Info("Requesting pods from API server")

podList, err := rm.clientset.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
FieldSelector: selector.String(),
})

if err != nil {
klog.Error("pod listing failed:", err)

if err != nil {
return &v1.PodList{}, err
}
}

return podList, nil
}

// +kubebuilder:rbac:groups="",resources=nodes/proxy,verbs=list;get

func (rm *resourceManager) listPodsFromKubelet() (*v1.PodList, error) {
var podList v1.PodList

token, err := os.ReadFile(serviceAccountTokenPath)
if err != nil {
klog.Warning("Failed to read token for kubelet API access: ", err)

return &podList, err
}

kubeletCert, err := os.ReadFile(kubeletHTTPSCertPath)
if err != nil {
klog.Warning("Failed to read kubelet cert: ", err)

return &podList, err
}

certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(kubeletCert)

// There isn't an official documentation for the kubelet API. There is a blog post:
// https://www.deepnetwork.com/blog/2020/01/13/kubelet-api.html
// And a tool to work with the API:
// https://github.com/cyberark/kubeletctl

kubeletURL := "https://" + rm.hostIP + ":10250/pods"
req, _ := http.NewRequestWithContext(context.Background(), "GET", kubeletURL, nil)
req.Header.Set("Authorization", "Bearer "+string(token))

tr := &http.Transport{
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
RootCAs: certPool,
ServerName: rm.nodeName,
},
}
client := &http.Client{
Timeout: kubeletAPITimeout,
Transport: tr,
}

klog.V(4).Infof("Requesting pods from kubelet (%s)", kubeletURL)

resp, err := (*client).Do(req)
if err != nil {
klog.Warning("Failed to read pods from kubelet API: ", err)

return &podList, err
}

body, err := io.ReadAll(resp.Body)
if err != nil {
klog.Warning("Failed to read http response body: ", err)

return &podList, err
}

resp.Body.Close()

err = json.Unmarshal(body, &podList)
if err != nil {
klog.Warning("Failed to unmarshal PodList from response: ", err)

return &podList, err
}

return &podList, nil
}

func (rm *resourceManager) listPods() (*v1.PodList, error) {
// Try to use kubelet API as long as it provides listings within retries
if rm.useKubelet {
var neterr net.Error

for i := 0; i < kubeletAPIMaxRetries; i++ {
if podList, err := rm.listPodsFromKubelet(); err == nil {
return podList, nil
} else if errors.As(err, neterr); neterr.Timeout() {
continue
}

// If error is non-timeout, break to stop using kubelet API
break
}

klog.Warning("Stopping Kubelet API use due to error/timeout")

rm.useKubelet = false
}

return rm.listPodsFromAPIServer()
}

func (rm *resourceManager) listPodsOnNodeWithStates(states []string) map[string]*v1.Pod {
pods := make(map[string]*v1.Pod)

podList, err := rm.listPods()
if err != nil {
klog.Error("pod listing failed:", err)

return pods
}

for i := range podList.Items {
key := getPodKey(&podList.Items[i])
pods[key] = &podList.Items[i]
phase := string(podList.Items[i].Status.Phase)
if slices.Contains(states, phase) {
key := getPodKey(&podList.Items[i])
pods[key] = &podList.Items[i]
}
}

return pods
Expand Down Expand Up @@ -516,7 +666,7 @@ func (rm *resourceManager) findAllocationPodCandidate() (*podCandidate, error) {

// getNodePendingGPUPods returns a map of pod names -> pods that are pending and use the gpu.
func (rm *resourceManager) getNodePendingGPUPods() (map[string]*v1.Pod, error) {
pendingPods := rm.listPodsOnNodeWithState(string(v1.PodPending))
pendingPods := rm.listPodsOnNodeWithStates([]string{string(v1.PodPending)})

for podName, pod := range pendingPods {
if numGPUUsingContainers(pod, rm.fullResourceName) == 0 {
Expand Down
19 changes: 19 additions & 0 deletions cmd/gpu_plugin/rm/gpu_plugin_resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func newMockResourceManager(pods []v1.Pod) ResourceManager {
fullResourceName: "gpu.intel.com/i915",
assignments: make(map[string]podAssignmentDetails),
retryTimeout: 1 * time.Millisecond,
useKubelet: false,
}

deviceInfoMap := NewDeviceInfoMap()
Expand Down Expand Up @@ -168,6 +169,9 @@ func TestGetPreferredFractionalAllocation(t *testing.T) {
},
},
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
}

gpuLessTestPod := v1.Pod{
Expand Down Expand Up @@ -326,6 +330,9 @@ func TestCreateFractionalResourceResponse(t *testing.T) {
},
},
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
}
unAnnotatedTestPod := *properTestPod.DeepCopy()
unAnnotatedTestPod.ObjectMeta.Annotations = nil
Expand Down Expand Up @@ -458,6 +465,9 @@ func TestCreateFractionalResourceResponseWithOneCardTwoTiles(t *testing.T) {
},
},
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
}

properPrefContainerRequests := []*v1beta1.ContainerPreferredAllocationRequest{
Expand Down Expand Up @@ -521,6 +531,9 @@ func TestCreateFractionalResourceResponseWithTwoCardsOneTile(t *testing.T) {
},
},
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
}

properPrefContainerRequests := []*v1beta1.ContainerPreferredAllocationRequest{
Expand Down Expand Up @@ -589,6 +602,9 @@ func TestCreateFractionalResourceResponseWithThreeCardsTwoTiles(t *testing.T) {
},
},
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
}

properPrefContainerRequests := []*v1beta1.ContainerPreferredAllocationRequest{
Expand Down Expand Up @@ -664,6 +680,9 @@ func TestCreateFractionalResourceResponseWithMultipleContainersTileEach(t *testi
},
},
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
}

properPrefContainerRequests := []*v1beta1.ContainerPreferredAllocationRequest{
Expand Down
4 changes: 4 additions & 0 deletions deployments/gpu_plugin/base/intel-gpu-plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: HOST_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
image: intel/intel-gpu-plugin:devel
imagePullPolicy: IfNotPresent
securityContext:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: intel-gpu-plugin
spec:
template:
spec:
containers:
- name: intel-gpu-plugin
volumeMounts:
- name: kubeletcrt
mountPath: /var/lib/kubelet/pki/kubelet.crt
volumes:
- name: kubeletcrt
hostPath:
path: /var/lib/kubelet/pki/kubelet.crt
type: FileOrCreate
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ metadata:
name: gpu-manager-role
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["list"]
resources: ["pods", "nodes/proxy"]
verbs: ["list", "get"]
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ patches:
- path: add-podresource-mount.yaml
- path: add-args.yaml
- path: add-nodeselector-intel-gpu.yaml
- path: add-kubelet-crt-mount.yaml
7 changes: 7 additions & 0 deletions deployments/operator/rbac/gpu_manager_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ metadata:
creationTimestamp: null
name: gpu-manager-role
rules:
- apiGroups:
- ""
resources:
- nodes/proxy
verbs:
- get
- list
- apiGroups:
- ""
resources:
Expand Down
Loading