@@ -15,13 +15,16 @@ import (
15
15
16
16
const uriTemplate string = "tcp://%s:%s/status"
17
17
18
+ // customWatcher is a custom implementation of the cache.Watcher interface,
19
+ // designed to watch Kubernetes pods based on specific label selectors and namespace.
18
20
type customWatcher struct {
19
21
clientset * kubernetes.Clientset
20
22
labelSelector string
21
23
namespace string
22
24
}
23
25
24
- // newWatcher creates a new instance of customWatcher.
26
+ // newWatcher creates and returns a new instance of customWatcher.
27
+ // It is used to initialize the watcher with a Kubernetes clientset, a namespace, and a label selector for filtering the pods to be monitored.
25
28
func newWatcher (clientset * kubernetes.Clientset , namespace string , podLabels string ) cache.Watcher {
26
29
return & customWatcher {
27
30
clientset : clientset ,
@@ -30,7 +33,11 @@ func newWatcher(clientset *kubernetes.Clientset, namespace string, podLabels str
30
33
}
31
34
}
32
35
33
- // Watch starts a new watch session for Pods
36
+ // Watch initiates a new watch session for Pods by establishing a connection to the Kubernetes API.
37
+ // This function is used as part of the NewRetryWatcher setup, which ensures a resilient connection.
38
+ // If the connection to the API is interrupted, the NewRetryWatcher will automatically attempt to re-establish it,
39
+ // providing continuous monitoring of pod events. This approach is ideal for maintaining reliable event streaming,
40
+ // especially in cases of network instability or API server disruptions.
34
41
func (c * customWatcher ) Watch (options metav1.ListOptions ) (apiWatch.Interface , error ) {
35
42
options .LabelSelector = c .labelSelector
36
43
ns := c .namespace
@@ -57,7 +64,7 @@ func k8sGetClient() (*kubernetes.Clientset, error) {
57
64
return clientset , nil
58
65
}
59
66
60
- // listPods returns the initial list of pods
67
+ // listPods retrieves the initial list of pods that match the specified label criteria and namespace.
61
68
func listPods (clientset * kubernetes.Clientset , namespace string , podLabels string ) (* v1.PodList , error ) {
62
69
if namespace == "" {
63
70
namespace = metav1 .NamespaceAll
@@ -69,10 +76,12 @@ func listPods(clientset *kubernetes.Clientset, namespace string, podLabels strin
69
76
return podList , nil
70
77
}
71
78
72
- // initializePodEnlisting lists all Pods that match the criteria and initializes their state in PoolManager.
79
+ // initializePodEnlisting retrieves all pods matching the specified criteria and appends their URIs to the PoolManager's PodPhases.
80
+ // This function is invoked prior to starting the NewRetryWatcher to capture the initial state of existing pods
81
+ // and to obtain the ResourceVersion required for initializing the NewRetryWatcher.
73
82
func (pm * PoolManager ) initialPodEnlisting (exporter * Exporter , podList * v1.PodList , port string ) (string , error ) {
74
83
75
- log .Infof ("Found %d pods during initial list" , len (podList .Items ))
84
+ log .Infof ("Found %d pod(s) during initial list" , len (podList .Items ))
76
85
for _ , pod := range podList .Items {
77
86
podName := pod .Name
78
87
currentPhase := pod .Status .Phase
@@ -84,20 +93,20 @@ func (pm *PoolManager) initialPodEnlisting(exporter *Exporter, podList *v1.PodLi
84
93
return podList .ResourceVersion , nil
85
94
}
86
95
87
- // handlePodRunning handles actions for a pod that is in the Running phase.
96
+ // handlePodRunning is used when a pod is in the Running phase and needs to be appended into the pool manager's PodPhases .
88
97
func (pm * PoolManager ) handlePodRunning (exporter * Exporter , pod * v1.Pod , uri string ) {
89
98
ip := pod .Status .PodIP
90
99
podName := pod .Name
91
100
if ip != "" {
92
- log .Infof ("Handling Running pod %s with IP %s" , podName , ip )
101
+ log .Infof ("New running pod detected %s with IP %s" , podName , ip )
93
102
pm .Add (uri )
94
103
exporter .UpdatePoolManager (* pm )
95
104
} else {
96
- log .Debugf ("Pod %s is Running but has no IP assigned" , podName )
105
+ log .Debugf ("Pod %s is in Running state but has no IP assigned" , podName )
97
106
}
98
107
}
99
108
100
- // podAdded processes a newly added pod.
109
+ // processPodAdded handles the addition of a newly created pod to the cluster by appending its URI to the pool manager .
101
110
func (pm * PoolManager ) processPodAdded (exporter * Exporter , pod * v1.Pod , uri string ) {
102
111
pm .PodPhases [pod .Name ] = pod .Status .Phase
103
112
@@ -106,7 +115,9 @@ func (pm *PoolManager) processPodAdded(exporter *Exporter, pod *v1.Pod, uri stri
106
115
}
107
116
}
108
117
109
- // podModified processes modifications to an existing pod.
118
+ // processPodModified handles events triggered by pod modifications, including when a new pod is added to the cluster.
119
+ // To be included in the pool manager, the pod must be in the "Running" phase. The function checks the pod's current phase
120
+ // and, if it is running, calls handlePodRunning to append the pod to the pool manager's PodPhases.
110
121
func (pm * PoolManager ) processPodModified (exporter * Exporter , pod * v1.Pod , uri string ) {
111
122
lastPhase , exists := pm .PodPhases [pod .Name ]
112
123
@@ -117,7 +128,7 @@ func (pm *PoolManager) processPodModified(exporter *Exporter, pod *v1.Pod, uri s
117
128
pm .PodPhases [pod .Name ] = pod .Status .Phase
118
129
}
119
130
120
- // podDeleted processes the deletion of a pod .
131
+ // processPodDeleted handles the removal of a pods URI from the pool manager's PodPhases .
121
132
func (pm * PoolManager ) processPodDeleted (exporter * Exporter , pod * v1.Pod , uri string ) {
122
133
ip := pod .Status .PodIP
123
134
@@ -127,7 +138,9 @@ func (pm *PoolManager) processPodDeleted(exporter *Exporter, pod *v1.Pod, uri st
127
138
delete (pm .PodPhases , pod .Name )
128
139
}
129
140
130
- // DiscoverPods finds pods with the specified annotation in the given namespace.
141
+ // DiscoverPods begins by listing the pods that match the specified labels within the given namespace.
142
+ // It then starts a watch session in a separate goroutine.
143
+ // The list operation is performed first to retrieve the initial ResourceVersion, which is required to initialize a NewRetryWatcher.
131
144
func (pm * PoolManager ) DiscoverPods (exporter * Exporter , namespace string , podLabels string , port string ) error {
132
145
// Get the Kubernetes client
133
146
clientset , err := k8sGetClient ()
@@ -147,15 +160,19 @@ func (pm *PoolManager) DiscoverPods(exporter *Exporter, namespace string, podLab
147
160
return nil
148
161
}
149
162
150
- // watchPodEvents watches for pod events and processes them.
163
+ // watchPodEvents monitors pod events and processes them accordingly:
164
+ // - For "added" events, the new pod's URI is appended to the pool manager.
165
+ // - For "modified" events, it verifies if the pod is in the running state before appending its URI to the pool manager.
166
+ // - For "deleted" events, the pod's URI is removed from the pool manager's PodPhases.
167
+ // Note: There is an unresolved issue with timeout errors when a pod is deleted, which requires further investigation and handling.
151
168
func (pm * PoolManager ) watchPodEvents (exporter * Exporter , watcher cache.Watcher , resourceVersion string , port string ) {
152
169
retryWatcher , err := watch .NewRetryWatcher (resourceVersion , watcher )
153
170
if err != nil {
154
- log .Errorf ("Failed to create RetryWatcher : %v" , err )
171
+ log .Errorf ("Failed to create Retry Watcher : %v" , err )
155
172
return
156
173
}
157
174
defer retryWatcher .Stop ()
158
- log .Info ("RetryWatcher initialized successfully" )
175
+ log .Info ("Retry Watcher initialized successfully" )
159
176
160
177
for event := range retryWatcher .ResultChan () {
161
178
pod , ok := event .Object .(* v1.Pod )
0 commit comments