@@ -29,7 +29,6 @@ import (
29
29
"github.com/prometheus/common/model"
30
30
clientset "k8s.io/client-go/kubernetes"
31
31
"k8s.io/klog/v2"
32
-
33
32
)
34
33
35
34
const (
@@ -128,11 +127,11 @@ func (m *ksmLatencyMeasurement) Execute(config *measurement.Config) ([]measureme
128
127
// the scrape interval so we should cancel.
129
128
m .startQuerying (m .ctx , client , probeIntervalDefault )
130
129
// Retrieve initial latency when first call is done.
131
- m .initialLatency , err = m .retrieveLatencyMetrics (m .ctx , client )
130
+ m .initialLatency , err = m .retrieveKSMLatencyMetrics (m .ctx , client )
132
131
return nil , err
133
132
case "gather" :
134
133
defer m .cancel ()
135
- return m .createLatencySummary (m .ctx , client )
134
+ return m .createKSMLatencySummary (m .ctx , client )
136
135
default :
137
136
return nil , fmt .Errorf ("unknown action %v" , action )
138
137
}
@@ -152,45 +151,33 @@ func (m *crsmLatencyMeasurement) Execute(config *measurement.Config) ([]measurem
152
151
return nil , nil
153
152
}
154
153
m .startQuerying (m .ctx , client , probeIntervalDefault )
155
- m .initialLatency , err = m .retrieveLatencyMetrics (m .ctx , client )
154
+ m .initialLatency , err = m .retrieveCRSMLatencyMetrics (m .ctx , client )
156
155
return nil , err
157
156
case "gather" :
158
157
defer m .cancel ()
159
- return m .createLatencySummary (m .ctx , client )
158
+ return m .createCRSMLatencySummary (m .ctx , client )
160
159
default :
161
160
return nil , fmt .Errorf ("unknown action %v" , action )
162
161
}
163
162
}
164
163
165
- func getMetricsFromService (ctx context.Context , client clientset.Interface , namespace , serviceName string , port int ) (string , error ) {
166
- ctx , cancel := context .WithTimeout (ctx , 2 * time .Minute )
167
- defer cancel ()
168
- out , err := client .CoreV1 ().RESTClient ().Get ().
169
- Resource ("services" ).
170
- SubResource ("proxy" ).
171
- Namespace (namespace ).
172
- Name (fmt .Sprintf ("%v:%v" , serviceName , port )).
173
- Suffix ("metrics" ).
174
- Do (ctx ).Raw ()
175
- return string (out ), err
176
- }
177
-
178
164
func (m * ksmLatencyMeasurement ) stop () error {
179
- if ! m .isRunning {
180
- return fmt .Errorf ("%s: measurement was not running" , m )
181
- }
182
- m .isRunning = false
183
- m .cancel ()
184
- m .wg .Wait ()
185
- return nil
165
+ if ! m .isRunning {
166
+ return fmt .Errorf ("%s: measurement was not running" , m )
167
+ }
168
+ m .cancel ()
169
+ m .wg .Wait ()
170
+ return nil
186
171
}
187
172
188
- func (m * ksmLatencyMeasurement ) createLatencySummary (ctx context.Context , client clientset.Interface ) ([]measurement.Summary , error ) {
189
- latestLatency , err := m .retrieveLatencyMetrics (ctx , client )
173
+ func (m * ksmLatencyMeasurement ) createKSMLatencySummary (ctx context.Context , client clientset.Interface ) ([]measurement.Summary , error ) {
174
+ latestLatency , err := m .retrieveKSMLatencyMetrics (ctx , client )
190
175
if err != nil {
191
176
return nil , err
192
177
}
193
- m .stop ()
178
+ if err = m .stop (); err != nil {
179
+ return nil , err
180
+ }
194
181
// We want to subtract the latest histogram from the first one we collect.
195
182
finalLatency := HistogramSub (latestLatency , m .initialLatency )
196
183
// Pretty Print the report.
@@ -202,6 +189,12 @@ func (m *ksmLatencyMeasurement) createLatencySummary(ctx context.Context, client
202
189
if err != nil {
203
190
return nil , err
204
191
}
192
+
193
+ // Log the latency results for visibility
194
+ klog .Infof ("%s: Latency Results - P50: %v, P90: %v, P99: %v" ,
195
+ m , result .Perc50 , result .Perc90 , result .Perc99 )
196
+ klog .Infof ("%s: Final latency summary: %s" , m , content )
197
+
205
198
// Create Summary.
206
199
return []measurement.Summary {measurement .CreateSummary (ksmLatencyName , "json" , content )}, nil
207
200
}
@@ -211,43 +204,86 @@ func (m *ksmLatencyMeasurement) createLatencySummary(ctx context.Context, client
211
204
func (m * ksmLatencyMeasurement ) startQuerying (ctx context.Context , client clientset.Interface , interval time.Duration ) {
212
205
m .isRunning = true
213
206
m .wg .Add (1 )
214
- go func () {
215
- defer m .wg .Done ()
216
- for {
217
- select {
218
- case <- ctx .Done ():
219
- return
220
- case <- time .After (interval ):
221
- _ , err := getMetricsFromService (ctx , client , m .namespace , m .serviceName , m .metricsPort )
222
- if err != nil {
223
- klog .V (2 ).Infof ("error during fetching metrics from service: %v" , err )
224
- }
207
+ go m .queryLoop (ctx , client , interval )
208
+ }
209
+
210
+ func (m * ksmLatencyMeasurement ) queryLoop (ctx context.Context , client clientset.Interface , interval time.Duration ) {
211
+ defer m .wg .Done ()
212
+ queryCount := 0
213
+ for {
214
+ select {
215
+ case <- ctx .Done ():
216
+ klog .V (2 ).Infof ("%s: stopping query loop after %d queries" , m , queryCount )
217
+ return
218
+ case <- time .After (interval ):
219
+ queryCount ++
220
+ klog .V (4 ).Infof ("%s: executing query #%d" , m , queryCount )
221
+ output , err := m .getMetricsFromService (ctx , client , m .metricsPort )
222
+ if err != nil {
223
+ klog .V (2 ).Infof ("%s: error during fetching metrics from service (query #%d): %v" , m , queryCount , err )
224
+ }
225
+ if output == "" {
226
+ klog .V (2 ).Infof ("%s: /metrics endpoint returned no data in namespace: %s from service: %s port: %d" ,
227
+ m , m .namespace , m .serviceName , m .metricsPort )
228
+ } else {
229
+ klog .V (4 ).Infof ("%s: successfully fetched %d bytes from metrics endpoint (query #%d)" ,
230
+ m , len (output ), queryCount )
225
231
}
226
232
}
227
- }()
233
+ }
228
234
}
229
235
230
- func (m * ksmLatencyMeasurement ) retrieveLatencyMetrics (ctx context.Context , c clientset.Interface ) (* measurementutil.Histogram , error ) {
231
- hist := measurementutil .NewHistogram (nil )
232
- output , err := getMetricsFromService (ctx , c , m .namespace , m .serviceName , m .selfPort )
236
+ func (m * ksmLatencyMeasurement ) retrieveKSMLatencyMetrics (ctx context.Context , c clientset.Interface ) (* measurementutil.Histogram , error ) {
237
+ klog .V (4 ).Infof ("%s: retrieving KSM latency metrics" , m )
238
+ ksmHist := measurementutil .NewHistogram (nil )
239
+ output , err := m .getMetricsFromService (ctx , c , m .selfPort )
233
240
if err != nil {
234
- return hist , err
241
+ klog .Errorf ("%s: failed to get metrics from service: %v" , m , err )
242
+ return ksmHist , err
235
243
}
236
244
samples , err := measurementutil .ExtractMetricSamples (output )
237
245
if err != nil {
238
- return hist , err
246
+ klog .Errorf ("%s: failed to extract metric samples: %v" , m , err )
247
+ return ksmHist , err
239
248
}
249
+
250
+ sampleCount := 0
240
251
for _ , sample := range samples {
241
- if sample .Metric [model .MetricNameLabel ] == ksmRequestDurationMetricName {
242
- measurementutil .ConvertSampleToHistogram (sample , hist )
252
+ switch sample .Metric [model .MetricNameLabel ] {
253
+ case ksmRequestDurationMetricName :
254
+ measurementutil .ConvertSampleToHistogram (sample , ksmHist )
255
+ sampleCount ++
243
256
}
244
257
}
245
- return hist , nil
258
+ klog .V (4 ).Infof ("%s: processed %d histogram samples" , m , sampleCount )
259
+ return ksmHist , nil
260
+ }
261
+
262
+ func (m * ksmLatencyMeasurement ) getMetricsFromService (ctx context.Context , client clientset.Interface , port int ) (string , error ) {
263
+ ctx , cancel := context .WithTimeout (ctx , 2 * time .Minute )
264
+ defer cancel ()
265
+
266
+ klog .V (4 ).Infof ("%s: fetching metrics from %s/%s:%d" , m , m .namespace , m .serviceName , port )
267
+
268
+ out , err := client .CoreV1 ().RESTClient ().Get ().
269
+ Resource ("services" ).
270
+ SubResource ("proxy" ).
271
+ Namespace (m .namespace ).
272
+ Name (fmt .Sprintf ("%v:%v" , m .serviceName , port )).
273
+ Suffix ("metrics" ).
274
+ Do (ctx ).Raw ()
275
+
276
+ if err != nil {
277
+ klog .V (2 ).Infof ("%s: error fetching metrics from %s/%s:%d: %v" , m , m .namespace , m .serviceName , port , err )
278
+ }
279
+
280
+ return string (out ), err
246
281
}
282
+
247
283
// Dispose cleans up after the measurement.
248
284
func (m * ksmLatencyMeasurement ) Dispose () {
249
285
if err := m .stop (); err != nil {
250
- klog .V (2 ).Infof ("error during dispose call: %v" , err )
286
+ klog .V (2 ).Infof ("%s: error during dispose call: %v" , m , err )
251
287
}
252
288
}
253
289
@@ -257,21 +293,22 @@ func (m *ksmLatencyMeasurement) String() string {
257
293
}
258
294
259
295
func (m * crsmLatencyMeasurement ) stop () error {
260
- if ! m .isRunning {
261
- return fmt .Errorf ("%s: measurement was not running" , m )
262
- }
263
- m .isRunning = false
264
- m .cancel ()
265
- m .wg .Wait ()
266
- return nil
296
+ if ! m .isRunning {
297
+ return fmt .Errorf ("%s: measurement was not running" , m )
298
+ }
299
+ m .cancel ()
300
+ m .wg .Wait ()
301
+ return nil
267
302
}
268
303
269
- func (m * crsmLatencyMeasurement ) createLatencySummary (ctx context.Context , client clientset.Interface ) ([]measurement.Summary , error ) {
270
- latestLatency , err := m .retrieveLatencyMetrics (ctx , client )
304
+ func (m * crsmLatencyMeasurement ) createCRSMLatencySummary (ctx context.Context , client clientset.Interface ) ([]measurement.Summary , error ) {
305
+ latestLatency , err := m .retrieveCRSMLatencyMetrics (ctx , client )
271
306
if err != nil {
272
307
return nil , err
273
308
}
274
- m .stop ()
309
+ if err = m .stop (); err != nil {
310
+ return nil , err
311
+ }
275
312
finalLatency := HistogramSub (latestLatency , m .initialLatency )
276
313
result := & measurementutil.LatencyMetric {}
277
314
if err = SetQuantileFromHistogram (result , finalLatency ); err != nil {
@@ -281,49 +318,97 @@ func (m *crsmLatencyMeasurement) createLatencySummary(ctx context.Context, clien
281
318
if err != nil {
282
319
return nil , err
283
320
}
321
+
322
+ // Log the latency results for visibility
323
+ klog .Infof ("%s: Latency Results - P50: %v, P90: %v, P99: %v" ,
324
+ m , result .Perc50 , result .Perc90 , result .Perc99 )
325
+ klog .Infof ("%s: Final latency summary: %s" , m , content )
326
+
284
327
return []measurement.Summary {measurement .CreateSummary (crsmLatencyName , "json" , content )}, nil
285
328
}
286
329
287
330
func (m * crsmLatencyMeasurement ) startQuerying (ctx context.Context , client clientset.Interface , interval time.Duration ) {
288
331
m .isRunning = true
289
332
m .wg .Add (1 )
290
- go func () {
291
- defer m .wg .Done ()
292
- for {
293
- select {
294
- case <- ctx .Done ():
295
- return
296
- case <- time .After (interval ):
297
- _ , err := getMetricsFromService (ctx , client , m .namespace , m .serviceName , m .metricsPort )
298
- if err != nil {
299
- klog .V (2 ).Infof ("error during fetching metrics from service: %v" , err )
300
- }
333
+ go m .queryLoop (ctx , client , interval )
334
+ }
335
+
336
+ func (m * crsmLatencyMeasurement ) queryLoop (ctx context.Context , client clientset.Interface , interval time.Duration ) {
337
+ defer m .wg .Done ()
338
+ queryCount := 0
339
+ for {
340
+ select {
341
+ case <- ctx .Done ():
342
+ klog .V (2 ).Infof ("%s: stopping query loop after %d queries" , m , queryCount )
343
+ return
344
+ case <- time .After (interval ):
345
+ queryCount ++
346
+ klog .V (4 ).Infof ("%s: executing query #%d" , m , queryCount )
347
+ output , err := m .getMetricsFromService (ctx , client , m .metricsPort )
348
+ if err != nil {
349
+ klog .V (2 ).Infof ("%s: error during fetching metrics from service (query #%d): %v" , m , queryCount , err )
350
+ }
351
+ if output == "" {
352
+ klog .V (2 ).Infof ("%s: /metrics endpoint returned no data in namespace: %s from service: %s port: %d" ,
353
+ m , m .namespace , m .serviceName , m .metricsPort )
354
+ } else {
355
+ klog .V (4 ).Infof ("%s: successfully fetched %d bytes from metrics endpoint (query #%d)" ,
356
+ m , len (output ), queryCount )
301
357
}
302
358
}
303
- }()
359
+ }
304
360
}
305
361
306
- func (m * crsmLatencyMeasurement ) retrieveLatencyMetrics (ctx context.Context , c clientset.Interface ) (* measurementutil.Histogram , error ) {
307
- hist := measurementutil .NewHistogram (nil )
308
- output , err := getMetricsFromService (ctx , c , m .namespace , m .serviceName , m .selfPort )
362
+ func (m * crsmLatencyMeasurement ) retrieveCRSMLatencyMetrics (ctx context.Context , c clientset.Interface ) (* measurementutil.Histogram , error ) {
363
+ klog .V (4 ).Infof ("%s: retrieving CRSM latency metrics" , m )
364
+ crsmHist := measurementutil .NewHistogram (nil )
365
+ output , err := m .getMetricsFromService (ctx , c , m .selfPort )
309
366
if err != nil {
310
- return hist , err
367
+ klog .Errorf ("%s: failed to get metrics from service: %v" , m , err )
368
+ return crsmHist , err
311
369
}
312
370
samples , err := measurementutil .ExtractMetricSamples (output )
313
371
if err != nil {
314
- return hist , err
372
+ klog .Errorf ("%s: failed to extract metric samples: %v" , m , err )
373
+ return crsmHist , err
315
374
}
375
+
376
+ sampleCount := 0
316
377
for _ , sample := range samples {
317
- if sample .Metric [model .MetricNameLabel ] == ksmRequestDurationMetricName {
318
- measurementutil .ConvertSampleToHistogram (sample , hist )
378
+ switch sample .Metric [model .MetricNameLabel ] {
379
+ case ksmRequestDurationMetricName :
380
+ measurementutil .ConvertSampleToHistogram (sample , crsmHist )
381
+ sampleCount ++
319
382
}
320
383
}
321
- return hist , nil
384
+ klog .V (4 ).Infof ("%s: processed %d histogram samples" , m , sampleCount )
385
+ return crsmHist , nil
386
+ }
387
+
388
+ func (m * crsmLatencyMeasurement ) getMetricsFromService (ctx context.Context , client clientset.Interface , port int ) (string , error ) {
389
+ ctx , cancel := context .WithTimeout (ctx , 2 * time .Minute )
390
+ defer cancel ()
391
+
392
+ klog .V (4 ).Infof ("%s: fetching metrics from %s/%s:%d" , m , m .namespace , m .serviceName , port )
393
+
394
+ out , err := client .CoreV1 ().RESTClient ().Get ().
395
+ Resource ("services" ).
396
+ SubResource ("proxy" ).
397
+ Namespace (m .namespace ).
398
+ Name (fmt .Sprintf ("%v:%v" , m .serviceName , port )).
399
+ Suffix ("metrics" ).
400
+ Do (ctx ).Raw ()
401
+
402
+ if err != nil {
403
+ klog .V (2 ).Infof ("%s: error fetching metrics from %s/%s:%d: %v" , m , m .namespace , m .serviceName , port , err )
404
+ }
405
+
406
+ return string (out ), err
322
407
}
323
408
324
409
func (m * crsmLatencyMeasurement ) Dispose () {
325
410
if err := m .stop (); err != nil {
326
- klog .V (2 ).Infof ("error during dispose call: %v" , err )
411
+ klog .V (2 ).Infof ("%s: error during dispose call: %v" , m , err )
327
412
}
328
413
}
329
414
0 commit comments