Skip to content

Commit fc198c6

Browse files
committed
Create list and watch functionality
1 parent 7a4e94d commit fc198c6

File tree

3 files changed

+127
-95
lines changed

3 files changed

+127
-95
lines changed

cmd/server.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/prometheus/client_golang/prometheus"
2626
"github.com/prometheus/client_golang/prometheus/promhttp"
2727
"github.com/spf13/cobra"
28+
v1 "k8s.io/api/core/v1"
2829
)
2930

3031
// Configuration variables
@@ -52,7 +53,9 @@ to quickly create a Cobra application.`,
5253
Run: func(cmd *cobra.Command, args []string) {
5354
log.Infof("Starting server on %v with path %v", listeningAddress, metricsEndpoint)
5455

55-
pm := phpfpm.PoolManager{}
56+
pm := phpfpm.PoolManager{
57+
PodPhases: make(map[string]v1.PodPhase),
58+
}
5659
// Initialize the Exporter before any dynamic or static setup
5760
exporter := phpfpm.NewExporter(pm)
5861

@@ -61,7 +64,7 @@ to quickly create a Cobra application.`,
6164
log.Info("Kubernetes auto-tracking enabled. Watching for pod changes...")
6265

6366
go func() {
64-
if err := pm.DiscoverPods(namespace, podLabels, port, exporter); err != nil {
67+
if err := pm.DiscoverPods(exporter, namespace, podLabels, port); err != nil {
6568
log.Error(err)
6669
}
6770
}()

phpfpm/phpfpm.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
fcgiclient "github.com/tomasen/fcgi_client"
29+
v1 "k8s.io/api/core/v1"
2930
)
3031

3132
// PoolProcessRequestIdle defines a process that is idle.
@@ -60,7 +61,8 @@ type logger interface {
6061

6162
// PoolManager manages all configured Pools
6263
type PoolManager struct {
63-
Pools []Pool `json:"pools"`
64+
Pools []Pool `json:"pools"`
65+
PodPhases map[string]v1.PodPhase `json:"podPhases"`
6466
}
6567

6668
// Pool describes a single PHP-FPM pool that can be reached via a Socket or TCP address
@@ -148,7 +150,7 @@ func (pm *PoolManager) Update() (err error) {
148150
}
149151

150152
// Remove will remove a pool from the pool manager based on the given URI.
151-
func (pm *PoolManager) Remove(uri string, exporter *Exporter) {
153+
func (pm *PoolManager) Remove(exporter *Exporter, uri string) {
152154
wg := &sync.WaitGroup{}
153155

154156
started := time.Now()

phpfpm/pod_discovery.go

Lines changed: 118 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,33 @@ import (
1313
"k8s.io/client-go/tools/watch"
1414
)
1515

16-
const uriTemplate string = "tcp://%%s:%s/status"
16+
const uriTemplate string = "tcp://%s:%s/status"
1717

1818
type customWatcher struct {
1919
clientset *kubernetes.Clientset
2020
labelSelector string
2121
namespace string
2222
}
2323

24+
// newWatcher creates a new instance of customWatcher.
25+
func newWatcher(clientset *kubernetes.Clientset, namespace string, podLabels string) cache.Watcher {
26+
return &customWatcher{
27+
clientset: clientset,
28+
namespace: namespace,
29+
labelSelector: podLabels,
30+
}
31+
}
32+
33+
// Watch starts a new watch session for Pods
34+
func (c *customWatcher) Watch(options metav1.ListOptions) (apiWatch.Interface, error) {
35+
options.LabelSelector = c.labelSelector
36+
ns := c.namespace
37+
if ns == "" {
38+
ns = metav1.NamespaceAll
39+
}
40+
return c.clientset.CoreV1().Pods(c.namespace).Watch(context.TODO(), options)
41+
}
42+
2443
// k8sGetClient returns a Kubernetes clientset to interact with the cluster.
2544
// This is intended to be used when the application is running inside a Kubernetes pod.
2645
func k8sGetClient() (*kubernetes.Clientset, error) {
@@ -38,115 +57,123 @@ func k8sGetClient() (*kubernetes.Clientset, error) {
3857
return clientset, nil
3958
}
4059

41-
// Watch starts a new watch session for Pods
42-
func (c *customWatcher) Watch(options metav1.ListOptions) (apiWatch.Interface, error) {
43-
if c.namespace == "" {
44-
c.namespace = metav1.NamespaceAll
60+
// listPods returns the initial list of pods
61+
func listPods(clientset *kubernetes.Clientset, namespace string, podLabels string) (*v1.PodList, error) {
62+
if namespace == "" {
63+
namespace = metav1.NamespaceAll
4564
}
46-
options.LabelSelector = c.labelSelector
47-
return c.clientset.CoreV1().Pods(c.namespace).Watch(context.TODO(), options)
65+
podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: podLabels})
66+
if err != nil {
67+
return nil, fmt.Errorf("failed to list pods: %w", err)
68+
}
69+
return podList, nil
4870
}
4971

50-
func newWatcher(clientset *kubernetes.Clientset, namespace string, podLabels string) cache.Watcher {
51-
return &customWatcher{clientset: clientset, namespace: namespace, labelSelector: podLabels}
72+
// initializePodEnlisting lists all Pods that match the criteria and initializes their state in PoolManager.
73+
func (pm *PoolManager) initialPodEnlisting(exporter *Exporter, podList *v1.PodList, port string) (string, error) {
74+
75+
log.Infof("Found %d pods during initial list", len(podList.Items))
76+
for _, pod := range podList.Items {
77+
podName := pod.Name
78+
currentPhase := pod.Status.Phase
79+
log.Debugf("Processing pod from initial list: %s, phase: %s", podName, currentPhase)
80+
81+
uri := fmt.Sprintf(uriTemplate, pod.Status.PodIP, port)
82+
pm.processPodAdded(exporter, &pod, uri)
83+
}
84+
return podList.ResourceVersion, nil
85+
}
86+
87+
// handlePodRunning handles actions for a pod that is in the Running phase.
88+
func (pm *PoolManager) handlePodRunning(exporter *Exporter, pod *v1.Pod, uri string) {
89+
ip := pod.Status.PodIP
90+
podName := pod.Name
91+
if ip != "" {
92+
log.Infof("Handling Running pod %s with IP %s", podName, ip)
93+
pm.Add(uri)
94+
exporter.UpdatePoolManager(*pm)
95+
} else {
96+
log.Debugf("Pod %s is Running but has no IP assigned", podName)
97+
}
98+
}
99+
100+
// podAdded processes a newly added pod.
101+
func (pm *PoolManager) processPodAdded(exporter *Exporter, pod *v1.Pod, uri string) {
102+
pm.PodPhases[pod.Name] = pod.Status.Phase
103+
104+
if pod.Status.Phase == v1.PodRunning {
105+
pm.handlePodRunning(exporter, pod, uri)
106+
}
107+
}
108+
109+
// podModified processes modifications to an existing pod.
110+
func (pm *PoolManager) processPodModified(exporter *Exporter, pod *v1.Pod, uri string) {
111+
lastPhase, exists := pm.PodPhases[pod.Name]
112+
113+
if exists && lastPhase == v1.PodPending && pod.Status.Phase == v1.PodRunning {
114+
log.Infof("Pod %s transitioned from Pending to Running", pod.Name)
115+
pm.handlePodRunning(exporter, pod, uri)
116+
}
117+
pm.PodPhases[pod.Name] = pod.Status.Phase
118+
}
119+
120+
// podDeleted processes the deletion of a pod.
121+
func (pm *PoolManager) processPodDeleted(exporter *Exporter, pod *v1.Pod, uri string) {
122+
ip := pod.Status.PodIP
123+
124+
log.Infof("Removing pod %s with IP %s from PoolManager", pod.Name, ip)
125+
pm.Remove(exporter, uri)
126+
127+
delete(pm.PodPhases, pod.Name)
52128
}
53129

54130
// DiscoverPods finds pods with the specified annotation in the given namespace.
55-
func (pm *PoolManager) DiscoverPods(namespace string, podLabels string, port string, exporter *Exporter) error {
131+
func (pm *PoolManager) DiscoverPods(exporter *Exporter, namespace string, podLabels string, port string) error {
56132
// Get the Kubernetes client
57133
clientset, err := k8sGetClient()
58134
if err != nil {
59135
return err
60136
}
61137

62-
log.Info("Test 1.9.17")
63-
64-
var podPhases = make(map[string]v1.PodPhase)
65-
66138
watcher := newWatcher(clientset, namespace, podLabels)
67139

68-
// Watch for pod events
69-
go func() {
70-
retryWatcher, err := watch.NewRetryWatcher("1", watcher)
71-
if err != nil {
72-
log.Errorf("Failed to create RetryWatcher: %v", err)
73-
}
74-
defer retryWatcher.Stop()
75-
log.Info("RetryWatcher successfully initialized")
76-
77-
uriTemplate := fmt.Sprintf(uriTemplate, port)
78-
79-
for event := range retryWatcher.ResultChan() {
80-
pod, ok := event.Object.(*v1.Pod)
81-
if !ok {
82-
log.Errorf("Unexpected type in podWatch: %v", event.Object)
83-
continue
84-
}
85-
log.Debug("I am inside the go routine")
86-
87-
podName := pod.Name
88-
currentPhase := pod.Status.Phase
89-
90-
log.Debugf("Received event for pod: %s, type: %s, current phase: %s", podName, event.Type, currentPhase)
91-
92-
switch event.Type {
93-
case apiWatch.Added:
94-
// Initialize the pod's phase in the map
95-
pm.podAdded(podPhases, podName, currentPhase, pod, exporter, uriTemplate)
96-
fmt.Printf("Added %s", podPhases)
97-
case apiWatch.Modified:
98-
// Check for the Pending → Running transition
99-
pm.podModified(podPhases, podName, currentPhase, pod, exporter, uriTemplate)
100-
fmt.Printf("Modified %s", podPhases)
101-
case apiWatch.Deleted:
102-
pm.podDeleted(podPhases, podName, pod, exporter, uriTemplate)
103-
fmt.Printf("Deleted %s", podPhases)
104-
}
105-
}
106-
}()
140+
podList, err := listPods(clientset, namespace, podLabels)
141+
initialResourceVersion, err := pm.initialPodEnlisting(exporter, podList, port)
142+
if err != nil {
143+
return err
144+
}
145+
146+
go pm.watchPodEvents(exporter, watcher, initialResourceVersion, port)
107147
return nil
108148
}
109149

110-
func (pm *PoolManager) podModified(podPhases map[string]v1.PodPhase, podName string, currentPhase v1.PodPhase, pod *v1.Pod, exporter *Exporter, uriTemplate string) {
111-
lastPhase, exists := podPhases[podName]
112-
if exists && lastPhase == v1.PodPending && currentPhase == v1.PodRunning {
113-
log.Infof("Pod %s transitioned from Pending to Running", podName)
114-
115-
ip := pod.Status.PodIP
116-
if ip != "" {
117-
uri := fmt.Sprintf(uriTemplate, ip)
118-
log.Infof("Adding Running pod %s with IP %s", podName, ip)
119-
pm.Add(uri)
120-
exporter.UpdatePoolManager(*pm)
121-
} else {
122-
log.Debugf("Pod %s is Running but has no IP assigned", podName)
123-
}
150+
// watchPodEvents watches for pod events and processes them.
151+
func (pm *PoolManager) watchPodEvents(exporter *Exporter, watcher cache.Watcher, resourceVersion string, port string) {
152+
retryWatcher, err := watch.NewRetryWatcher(resourceVersion, watcher)
153+
if err != nil {
154+
log.Errorf("Failed to create RetryWatcher: %v", err)
155+
return
124156
}
125-
podPhases[podName] = currentPhase
126-
}
127-
128-
func (pm *PoolManager) podAdded(podPhases map[string]v1.PodPhase, podName string, currentPhase v1.PodPhase, pod *v1.Pod, exporter *Exporter, uriTemplate string) {
129-
podPhases[podName] = currentPhase
130-
131-
if currentPhase == v1.PodRunning {
132-
ip := pod.Status.PodIP
133-
if ip != "" {
134-
uri := fmt.Sprintf(uriTemplate, ip)
135-
log.Infof("New pod %s added and already Running with IP %s", podName, ip)
136-
pm.Add(uri)
137-
exporter.UpdatePoolManager(*pm)
138-
} else {
139-
log.Debugf("Pod %s added but has no IP yet", podName)
157+
defer retryWatcher.Stop()
158+
log.Info("RetryWatcher initialized successfully")
159+
160+
for event := range retryWatcher.ResultChan() {
161+
pod, ok := event.Object.(*v1.Pod)
162+
if !ok {
163+
log.Errorf("Unexpected type in pod event: %v", event.Object)
164+
continue
140165
}
141-
}
142-
}
143166

144-
func (pm *PoolManager) podDeleted(podPhases map[string]v1.PodPhase, podName string, pod *v1.Pod, exporter *Exporter, uriTemplate string) {
145-
ip := pod.Status.PodIP
146-
if ip != "" {
147-
uri := fmt.Sprintf(uriTemplate, ip)
148-
log.Infof("Removing pod %s with IP %s from PoolManager", podName, ip)
149-
pm.Remove(uri, exporter)
167+
uri := fmt.Sprintf(uriTemplate, pod.Status.PodIP, port)
168+
log.Debugf("Received event for pod %s: type=%s, phase=%s", pod.Name, event.Type, pod.Status.Phase)
169+
170+
switch event.Type {
171+
case apiWatch.Added:
172+
pm.processPodAdded(exporter, pod, uri)
173+
case apiWatch.Modified:
174+
pm.processPodModified(exporter, pod, uri)
175+
case apiWatch.Deleted:
176+
pm.processPodDeleted(exporter, pod, uri)
177+
}
150178
}
151-
delete(podPhases, podName)
152179
}

0 commit comments

Comments
 (0)