Skip to content

Commit 7d56f3f

Browse files
Merge pull request #1 from hackthebox/Upgrade-exporter
Upgrade exporter to include functionality for Pod autodiscovery in k8s environments
2 parents b088646 + 56db4ec commit 7d56f3f

File tree

4 files changed

+275
-8
lines changed

4 files changed

+275
-8
lines changed

cmd/server.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/prometheus/client_golang/prometheus"
2626
"github.com/prometheus/client_golang/prometheus/promhttp"
2727
"github.com/spf13/cobra"
28+
v1 "k8s.io/api/core/v1"
2829
)
2930

3031
// Configuration variables
@@ -33,6 +34,10 @@ var (
3334
metricsEndpoint string
3435
scrapeURIs []string
3536
fixProcessCount bool
37+
k8sAutoTracking bool
38+
namespace string
39+
podLabels string
40+
port string
3641
)
3742

3843
// serverCmd represents the server command
@@ -48,14 +53,30 @@ to quickly create a Cobra application.`,
4853
Run: func(cmd *cobra.Command, args []string) {
4954
log.Infof("Starting server on %v with path %v", listeningAddress, metricsEndpoint)
5055

51-
pm := phpfpm.PoolManager{}
52-
53-
for _, uri := range scrapeURIs {
54-
pm.Add(uri)
56+
pm := phpfpm.PoolManager{
57+
PodPhases: make(map[string]v1.PodPhase),
5558
}
56-
59+
// Initialize the Exporter before any dynamic or static setup
5760
exporter := phpfpm.NewExporter(pm)
5861

62+
// Enable dynamic pod tracking if the flag is set
63+
if k8sAutoTracking {
64+
log.Info("Kubernetes auto-tracking enabled. Watching for pod changes...")
65+
66+
go func() {
67+
if err := pm.DiscoverPods(exporter, namespace, podLabels, port); err != nil {
68+
log.Error(err)
69+
}
70+
}()
71+
72+
} else {
73+
// Static scraping of predefined URIs
74+
for _, uri := range scrapeURIs {
75+
pm.Add(uri)
76+
}
77+
exporter.UpdatePoolManager(pm)
78+
}
79+
5980
if fixProcessCount {
6081
log.Info("Idle/Active/Total Processes will be calculated by php-fpm_exporter.")
6182
exporter.CountProcessState = true
@@ -121,18 +142,31 @@ to quickly create a Cobra application.`,
121142
func init() {
122143
RootCmd.AddCommand(serverCmd)
123144

145+
// Web
124146
serverCmd.Flags().StringVar(&listeningAddress, "web.listen-address", ":9253", "Address on which to expose metrics and web interface.")
125147
serverCmd.Flags().StringVar(&metricsEndpoint, "web.telemetry-path", "/metrics", "Path under which to expose metrics.")
148+
149+
// PHP FPM
126150
serverCmd.Flags().StringSliceVar(&scrapeURIs, "phpfpm.scrape-uri", []string{"tcp://127.0.0.1:9000/status"}, "FastCGI address, e.g. unix:///tmp/php.sock;/status or tcp://127.0.0.1:9000/status")
127151
serverCmd.Flags().BoolVar(&fixProcessCount, "phpfpm.fix-process-count", false, "Enable to calculate process numbers via php-fpm_exporter since PHP-FPM sporadically reports wrong active/idle/total process numbers.")
128152

153+
// Kubernetes
154+
serverCmd.Flags().BoolVar(&k8sAutoTracking, "k8s.autotracking", false, "Enable automatic tracking of PHP-FPM pods in Kubernetes.")
155+
serverCmd.Flags().StringVarP(&namespace, "k8s.namespace", "n", "", "Kubernetes namespace to monitor (defaults to all namespaces if not set)")
156+
serverCmd.Flags().StringVarP(&podLabels, "k8s.pod-labels", "l", "php-fpm-exporter/collect=true", "Kubernetes pod labels as a list of key-value pairs")
157+
serverCmd.Flags().StringVarP(&port, "k8s.port", "p", "9000", "Kubernetes pod port")
158+
129159
// Workaround since vipers BindEnv is currently not working as expected (see https://github.com/spf13/viper/issues/461)
130160

131161
envs := map[string]string{
132162
"PHP_FPM_WEB_LISTEN_ADDRESS": "web.listen-address",
133163
"PHP_FPM_WEB_TELEMETRY_PATH": "web.telemetry-path",
134164
"PHP_FPM_SCRAPE_URI": "phpfpm.scrape-uri",
135165
"PHP_FPM_FIX_PROCESS_COUNT": "phpfpm.fix-process-count",
166+
"PHP_FPM_K8S_AUTOTRACKING": "k8s.autotracking",
167+
"PHP_FPM_K8S_NAMESPACE": "k8s.namespace",
168+
"PHP_FPM_K8S_POD_LABELS": "k8s.pod-labels",
169+
"PHP_FPM_K8S_POD_PORT": "k8s.port",
136170
}
137171

138172
mapEnvVars(envs, serverCmd)

phpfpm/exporter.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,3 +255,12 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
255255
ch <- e.processLastRequestCPU
256256
ch <- e.processRequestDuration
257257
}
258+
259+
// UpdatePoolManager updates the Pool Manager
260+
func (e *Exporter) UpdatePoolManager(newPM PoolManager) {
261+
e.mutex.Lock()
262+
defer e.mutex.Unlock()
263+
264+
e.PoolManager = newPM
265+
log.Info("PoolManager has been updated")
266+
}

phpfpm/phpfpm.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
fcgiclient "github.com/tomasen/fcgi_client"
29+
v1 "k8s.io/api/core/v1"
2930
)
3031

3132
// PoolProcessRequestIdle defines a process that is idle.
@@ -60,7 +61,8 @@ type logger interface {
6061

6162
// PoolManager manages all configured Pools
6263
type PoolManager struct {
63-
Pools []Pool `json:"pools"`
64+
Pools []Pool `json:"pools"`
65+
PodPhases map[string]v1.PodPhase `json:"podPhases"`
6466
}
6567

6668
// Pool describes a single PHP-FPM pool that can be reached via a Socket or TCP address
@@ -147,6 +149,32 @@ func (pm *PoolManager) Update() (err error) {
147149
return nil
148150
}
149151

152+
// Remove will remove a pool from the pool manager based on the given URI.
153+
func (pm *PoolManager) Remove(exporter *Exporter, uri string) {
154+
wg := &sync.WaitGroup{}
155+
156+
started := time.Now()
157+
158+
for idx := range pm.Pools {
159+
if pm.Pools[idx].Address == uri {
160+
wg.Add(1)
161+
go func(i int) {
162+
defer wg.Done()
163+
164+
// Remove the pool by updating the Pools slice
165+
log.Debugf("Removing pool: %s", uri)
166+
pm.Pools = append(pm.Pools[:i], pm.Pools[i+1:]...)
167+
}(idx)
168+
}
169+
}
170+
171+
wg.Wait()
172+
173+
ended := time.Now()
174+
log.Debugf("Removed pools in %v", ended.Sub(started))
175+
exporter.UpdatePoolManager(*pm)
176+
}
177+
150178
// Update will connect to PHP-FPM and retrieve the latest data for the pool.
151179
func (p *Pool) Update() (err error) {
152180
p.ScrapeError = nil
@@ -185,8 +213,6 @@ func (p *Pool) Update() (err error) {
185213

186214
content = JSONResponseFixer(content)
187215

188-
log.Debugf("Pool[%v]: %v", p.Address, string(content))
189-
190216
if err = json.Unmarshal(content, &p); err != nil {
191217
log.Errorf("Pool[%v]: %v", p.Address, string(content))
192218
return p.error(err)

phpfpm/pod_discovery.go

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
package phpfpm
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
v1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
apiWatch "k8s.io/apimachinery/pkg/watch"
10+
"k8s.io/client-go/kubernetes"
11+
"k8s.io/client-go/rest"
12+
"k8s.io/client-go/tools/cache"
13+
"k8s.io/client-go/tools/watch"
14+
)
15+
16+
const uriTemplate string = "tcp://%s:%s/status"
17+
18+
// customWatcher is a custom implementation of the cache.Watcher interface,
19+
// designed to watch Kubernetes pods based on specific label selectors and namespace.
20+
type customWatcher struct {
21+
clientset *kubernetes.Clientset
22+
labelSelector string
23+
namespace string
24+
}
25+
26+
// newWatcher creates and returns a new instance of customWatcher.
27+
// It is used to initialize the watcher with a Kubernetes clientset, a namespace, and a label selector for filtering the pods to be monitored.
28+
func newWatcher(clientset *kubernetes.Clientset, namespace string, podLabels string) cache.Watcher {
29+
return &customWatcher{
30+
clientset: clientset,
31+
namespace: namespace,
32+
labelSelector: podLabels,
33+
}
34+
}
35+
36+
// Watch initiates a new watch session for Pods by establishing a connection to the Kubernetes API.
37+
// This function is used as part of the NewRetryWatcher setup, which ensures a resilient connection.
38+
// If the connection to the API is interrupted, the NewRetryWatcher will automatically attempt to re-establish it,
39+
// providing continuous monitoring of pod events. This approach is ideal for maintaining reliable event streaming,
40+
// especially in cases of network instability or API server disruptions.
41+
func (c *customWatcher) Watch(options metav1.ListOptions) (apiWatch.Interface, error) {
42+
options.LabelSelector = c.labelSelector
43+
ns := c.namespace
44+
if ns == "" {
45+
ns = metav1.NamespaceAll
46+
}
47+
return c.clientset.CoreV1().Pods(c.namespace).Watch(context.TODO(), options)
48+
}
49+
50+
// k8sGetClient returns a Kubernetes clientset to interact with the cluster.
51+
// This is intended to be used when the application is running inside a Kubernetes pod.
52+
func k8sGetClient() (*kubernetes.Clientset, error) {
53+
config, err := rest.InClusterConfig()
54+
if err != nil {
55+
return nil, fmt.Errorf("failed to create in-cluster config: %v", err)
56+
}
57+
58+
// Create a Kubernetes clientset using the in-cluster config
59+
clientset, err := kubernetes.NewForConfig(config)
60+
if err != nil {
61+
return nil, fmt.Errorf("failed to create Kubernetes clientset: %v", err)
62+
}
63+
64+
return clientset, nil
65+
}
66+
67+
// listPods retrieves the initial list of pods that match the specified label criteria and namespace.
68+
func listPods(clientset *kubernetes.Clientset, namespace string, podLabels string) (*v1.PodList, error) {
69+
if namespace == "" {
70+
namespace = metav1.NamespaceAll
71+
}
72+
podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: podLabels})
73+
if err != nil {
74+
return nil, fmt.Errorf("failed to list pods: %w", err)
75+
}
76+
return podList, nil
77+
}
78+
79+
// initializePodEnlisting retrieves all pods matching the specified criteria and appends their URIs to the PoolManager's PodPhases.
80+
// This function is invoked prior to starting the NewRetryWatcher to capture the initial state of existing pods
81+
// and to obtain the ResourceVersion required for initializing the NewRetryWatcher.
82+
func (pm *PoolManager) initialPodEnlisting(exporter *Exporter, podList *v1.PodList, port string) (string, error) {
83+
84+
log.Infof("Found %d pod(s) during initial list", len(podList.Items))
85+
for _, pod := range podList.Items {
86+
podName := pod.Name
87+
currentPhase := pod.Status.Phase
88+
log.Debugf("Processing pod from initial list: %s, phase: %s", podName, currentPhase)
89+
90+
uri := fmt.Sprintf(uriTemplate, pod.Status.PodIP, port)
91+
pm.processPodAdded(exporter, &pod, uri)
92+
}
93+
return podList.ResourceVersion, nil
94+
}
95+
96+
// handlePodRunning is used when a pod is in the Running phase and needs to be appended into the pool manager's PodPhases.
97+
func (pm *PoolManager) handlePodRunning(exporter *Exporter, pod *v1.Pod, uri string) {
98+
ip := pod.Status.PodIP
99+
podName := pod.Name
100+
if ip != "" {
101+
log.Infof("Pod in Running state detected %s with IP %s. Adding in the Pool Manager..", podName, ip)
102+
pm.Add(uri)
103+
exporter.UpdatePoolManager(*pm)
104+
} else {
105+
log.Debugf("Pod %s is in Running state but has no IP assigned", podName)
106+
}
107+
}
108+
109+
// processPodAdded handles the addition of a newly created pod to the cluster by appending its URI to the pool manager.
110+
func (pm *PoolManager) processPodAdded(exporter *Exporter, pod *v1.Pod, uri string) {
111+
pm.PodPhases[pod.Name] = pod.Status.Phase
112+
113+
if pod.Status.Phase == v1.PodRunning {
114+
pm.handlePodRunning(exporter, pod, uri)
115+
}
116+
}
117+
118+
// processPodModified handles events triggered by pod modifications, including when a new pod is added to the cluster.
119+
// To be included in the pool manager, the pod must be in the "Running" phase. The function checks the pod's current phase
120+
// and, if it is running, calls handlePodRunning to append the pod to the pool manager's PodPhases.
121+
func (pm *PoolManager) processPodModified(exporter *Exporter, pod *v1.Pod, uri string) {
122+
podName := pod.Name
123+
currentPhase := pod.Status.Phase
124+
lastPhase, exists := pm.PodPhases[podName]
125+
126+
if exists && lastPhase == v1.PodPending && currentPhase == v1.PodRunning {
127+
log.Infof("Pod %s transitioned from Pending to Running", podName)
128+
pm.handlePodRunning(exporter, pod, uri)
129+
}
130+
pm.PodPhases[podName] = currentPhase
131+
}
132+
133+
// processPodDeleted handles the removal of a pods URI from the pool manager's PodPhases.
134+
func (pm *PoolManager) processPodDeleted(exporter *Exporter, pod *v1.Pod, uri string) {
135+
ip := pod.Status.PodIP
136+
137+
log.Infof("Removing pod %s with IP %s from PoolManager", pod.Name, ip)
138+
pm.Remove(exporter, uri)
139+
140+
delete(pm.PodPhases, pod.Name)
141+
}
142+
143+
// DiscoverPods begins by listing the pods that match the specified labels within the given namespace.
144+
// It then starts a watch session in a separate goroutine.
145+
// The list operation is performed first to retrieve the initial ResourceVersion, which is required to initialize a NewRetryWatcher.
146+
func (pm *PoolManager) DiscoverPods(exporter *Exporter, namespace string, podLabels string, port string) error {
147+
// Get the Kubernetes client
148+
clientset, err := k8sGetClient()
149+
if err != nil {
150+
return err
151+
}
152+
153+
watcher := newWatcher(clientset, namespace, podLabels)
154+
155+
podList, err := listPods(clientset, namespace, podLabels)
156+
initialResourceVersion, err := pm.initialPodEnlisting(exporter, podList, port)
157+
if err != nil {
158+
return err
159+
}
160+
161+
go pm.watchPodEvents(exporter, watcher, initialResourceVersion, port)
162+
return nil
163+
}
164+
165+
// watchPodEvents monitors pod events and processes them accordingly:
166+
// - For "added" events, the new pod's URI is appended to the pool manager.
167+
// - For "modified" events, it verifies if the pod is in the running state before appending its URI to the pool manager.
168+
// - For "deleted" events, the pod's URI is removed from the pool manager's PodPhases.
169+
// Note: There is an unresolved issue with timeout errors when a pod is deleted, which requires further investigation and handling.
170+
func (pm *PoolManager) watchPodEvents(exporter *Exporter, watcher cache.Watcher, resourceVersion string, port string) {
171+
retryWatcher, err := watch.NewRetryWatcher(resourceVersion, watcher)
172+
if err != nil {
173+
log.Errorf("Failed to create Retry Watcher: %v", err)
174+
return
175+
}
176+
defer retryWatcher.Stop()
177+
log.Info("Retry Watcher initialized successfully")
178+
179+
for event := range retryWatcher.ResultChan() {
180+
pod, ok := event.Object.(*v1.Pod)
181+
if !ok {
182+
log.Errorf("Unexpected type in pod event: %v", event.Object)
183+
continue
184+
}
185+
186+
uri := fmt.Sprintf(uriTemplate, pod.Status.PodIP, port)
187+
log.Debugf("Received event for pod %s: type=%s, phase=%s", pod.Name, event.Type, pod.Status.Phase)
188+
189+
switch event.Type {
190+
case apiWatch.Added:
191+
pm.processPodAdded(exporter, pod, uri)
192+
case apiWatch.Modified:
193+
pm.processPodModified(exporter, pod, uri)
194+
case apiWatch.Deleted:
195+
pm.processPodDeleted(exporter, pod, uri)
196+
}
197+
}
198+
}

0 commit comments

Comments
 (0)