From 794ba277ef65c88f06615ed50648b1582ebd171e Mon Sep 17 00:00:00 2001 From: pgrefviau Date: Mon, 6 Jan 2025 12:05:55 -0800 Subject: [PATCH] [FLINK-36932] Added resource-level metrics for different states/statuses --- .../metrics/FlinkDeploymentMetrics.java | 137 +++++++++-- .../metrics/lifecycle/LifecycleMetrics.java | 90 +++++-- .../ResourceLifecycleMetricTracker.java | 7 + .../metrics/FlinkDeploymentMetricsTest.java | 225 +++++++++++++----- .../metrics/TestingMetricListener.java | 5 + .../ResourceLifecycleMetricsTest.java | 61 +++-- 6 files changed, 401 insertions(+), 124 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java index 310bb75a5f..9dac15556c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.metrics; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; @@ -24,6 +25,7 @@ import org.apache.flink.runtime.rest.messages.DashboardConfiguration; import org.apache.flink.util.StringUtils; +import org.apache.commons.lang3.EnumUtils; import org.apache.commons.lang3.math.NumberUtils; import java.util.Map; @@ -37,8 +39,9 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics>> deploymentStatuses = + private final Map>> jmDeploymentStatuses = new ConcurrentHashMap<>(); + private final Map>> jobStatuses = new ConcurrentHashMap<>(); // map(namespace, map(version, set(deployment))) private final Map>> deploymentFlinkVersions = new ConcurrentHashMap<>(); @@ -53,9 +56,11 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics { - initNamespaceDeploymentCounts(ns); - initNamespaceStatusCounts(ns); - return createDeploymentStatusMap(); - }) - .get(flinkApp.getStatus().getJobManagerDeploymentStatus()) - .add(deploymentName); + initJmDeploymentMetrics(namespace, deploymentName, flinkApp); + initJobMetrics(namespace, deploymentName, flinkApp); // Full runtime version queried from the JobManager REST API var flinkVersion = - flinkApp.getStatus() + flinkAppState .getClusterInfo() .getOrDefault(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, ""); if (StringUtils.isNullOrWhitespaceOnly(flinkVersion)) { @@ -146,12 +145,61 @@ public void onUpdate(FlinkDeployment flinkApp) { AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "0"))); } + private void initJmDeploymentMetrics( + String namespace, String deploymentName, FlinkDeployment flinkApp) { + var currentJmDeploymentStatus = flinkApp.getStatus().getJobManagerDeploymentStatus(); + + boolean deploymentRegistrationOccurred = + jmDeploymentStatuses + .computeIfAbsent( + namespace, + ns -> { + initNamespaceDeploymentCounts(ns); + initNamespaceJmDeploymentStatusCounts(ns); + return createStatusMapFromEnum( + JobManagerDeploymentStatus.class); + }) + .get(currentJmDeploymentStatus) + .add(deploymentName); + + if (deploymentRegistrationOccurred) { + initJmDeploymentStatusGauges(namespace, deploymentName); + } + } + + private void initJobMetrics(String namespace, String deploymentName, FlinkDeployment flinkApp) { + var jobStatusDetails = flinkApp.getStatus().getJobStatus(); + var jobStatus = jobStatusDetails.getState(); + if (jobStatus == null) { + return; + } + + boolean deploymentRegistrationOccurred = + jobStatuses + .computeIfAbsent( + namespace, + ns -> { + initNamespaceJobStatusCounts(ns); + return createStatusMapFromEnum(JobStatus.class); + }) + .get(jobStatus) + .add(deploymentName); + + if (deploymentRegistrationOccurred) { + initJobStatusGauges(namespace, deploymentName); + } + } + + @Override public void onRemove(FlinkDeployment flinkApp) { var namespace = flinkApp.getMetadata().getNamespace(); var name = flinkApp.getMetadata().getName(); - if (deploymentStatuses.containsKey(namespace)) { - deploymentStatuses.get(namespace).values().forEach(names -> names.remove(name)); + if (jmDeploymentStatuses.containsKey(namespace)) { + jmDeploymentStatuses.get(namespace).values().forEach(names -> names.remove(name)); + } + if (jobStatuses.containsKey(namespace)) { + jobStatuses.get(namespace).values().forEach(names -> names.remove(name)); } if (deploymentFlinkVersions.containsKey(namespace)) { deploymentFlinkVersions.get(namespace).values().forEach(names -> names.remove(name)); @@ -176,18 +224,60 @@ private void initNamespaceDeploymentCounts(String ns) { .gauge( COUNTER_NAME, () -> - deploymentStatuses.get(ns).values().stream() + jmDeploymentStatuses.get(ns).values().stream() .mapToInt(Set::size) .sum()); } - private void initNamespaceStatusCounts(String ns) { + private void initNamespaceJmDeploymentStatusCounts(String ns) { for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) { parentMetricGroup .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns) - .addGroup(STATUS_GROUP_NAME) + .addGroup(JM_DEPLOYMENT_STATUS_GROUP_NAME) .addGroup(status.toString()) - .gauge(COUNTER_NAME, () -> deploymentStatuses.get(ns).get(status).size()); + .gauge(COUNTER_NAME, () -> jmDeploymentStatuses.get(ns).get(status).size()); + } + } + + private void initNamespaceJobStatusCounts(String ns) { + for (JobStatus status : JobStatus.values()) { + parentMetricGroup + .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns) + .addGroup(JOB_STATUS_GROUP_NAME) + .addGroup(status.toString()) + .gauge(COUNTER_NAME, () -> jobStatuses.get(ns).get(status).size()); + } + } + + private void initJmDeploymentStatusGauges(String ns, String deploymentName) { + for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) { + parentMetricGroup + .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns) + .createResourceGroup(configuration, deploymentName) + .addGroup(JM_DEPLOYMENT_STATUS_GROUP_NAME) + .addGroup(status.toString()) + .gauge( + IN_STATUS_NAME, + () -> + jmDeploymentStatuses + .get(ns) + .get(status) + .contains(deploymentName) + ? 1 + : 0); + } + } + + private void initJobStatusGauges(String ns, String deploymentName) { + for (JobStatus status : JobStatus.values()) { + parentMetricGroup + .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns) + .createResourceGroup(configuration, deploymentName) + .addGroup(JOB_STATUS_GROUP_NAME) + .addGroup(status.toString()) + .gauge( + IN_STATUS_NAME, + () -> jobStatuses.get(ns).get(status).contains(deploymentName) ? 1 : 0); } } @@ -231,9 +321,10 @@ private void initNamespaceMemoryUsage(String ns) { .reduce(0L, Long::sum)); } - private Map> createDeploymentStatusMap() { - Map> statuses = new ConcurrentHashMap<>(); - for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) { + private static > Map> createStatusMapFromEnum( + Class statusType) { + Map> statuses = new ConcurrentHashMap<>(); + for (T status : statusType.getEnumConstants()) { statuses.put(status, ConcurrentHashMap.newKeySet()); } return statuses; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java index 0019cf22c0..adab558a65 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java @@ -68,9 +68,11 @@ public class LifecycleMetrics> public static final List TRACKED_TRANSITIONS = getTrackedTransitions(); + // map(tuple(namespace, resource), tracker) private final Map, ResourceLifecycleMetricTracker> lifecycleTrackers = new ConcurrentHashMap<>(); private final Set namespaces = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set resourceNames = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Clock clock; private final KubernetesOperatorMetricGroup operatorMetricGroup; @@ -104,6 +106,7 @@ public void onUpdate(CR cr) { @Override public void onRemove(CR cr) { + resourceNames.remove(cr.getMetadata().getName()); lifecycleTrackers.remove( Tuple2.of(cr.getMetadata().getNamespace(), cr.getMetadata().getName())); } @@ -120,6 +123,8 @@ private ResourceLifecycleMetricTracker getLifecycleMetricTracker(CR cr) { ? Instant.parse(cr.getMetadata().getCreationTimestamp()) : clock.instant(); return new ResourceLifecycleMetricTracker( + cr.getMetadata().getNamespace(), + cr.getMetadata().getName(), initialState, time, getTransitionHistograms(cr), @@ -129,32 +134,75 @@ private ResourceLifecycleMetricTracker getLifecycleMetricTracker(CR cr) { private void createNamespaceStateCountIfMissing(CR cr) { var namespace = cr.getMetadata().getNamespace(); - if (!namespaces.add(namespace)) { - return; + + if (namespaces.add(namespace)) { + var resourceNamespaceGroup = + operatorMetricGroup.createResourceNamespaceGroup( + config, cr.getClass(), namespace); + MetricGroup lifecycleNamespaceGroup = metricGroupFunction.apply(resourceNamespaceGroup); + + for (ResourceLifecycleState state : ResourceLifecycleState.values()) { + lifecycleNamespaceGroup + .addGroup("State") + .addGroup(state.name()) + .gauge( + "Count", + () -> + lifecycleTrackers.values().stream() + .filter( + tracker -> + isMatchingTrackerWithState( + tracker, namespace, state)) + .count()); + } } - MetricGroup lifecycleGroup = - metricGroupFunction.apply( - operatorMetricGroup.createResourceNamespaceGroup( - config, cr.getClass(), namespace)); - for (ResourceLifecycleState state : ResourceLifecycleState.values()) { - lifecycleGroup - .addGroup("State") - .addGroup(state.name()) - .gauge( - "Count", - () -> - lifecycleTrackers.entrySet().stream() - .filter( - tracker -> - tracker.getKey().f0.equals(namespace) - && tracker.getValue() - .getCurrentState() - == state) - .count()); + var resourceName = cr.getMetadata().getName(); + if (resourceNames.add(resourceName)) { + var resourceNamespaceGroup = + operatorMetricGroup.createResourceNamespaceGroup( + config, cr.getClass(), namespace); + MetricGroup lifecycleResourceGroup = + metricGroupFunction.apply( + resourceNamespaceGroup.createResourceGroup(config, resourceName)); + + for (ResourceLifecycleState state : ResourceLifecycleState.values()) { + lifecycleResourceGroup + .addGroup("State") + .addGroup(state.name()) + .gauge( + "InState", + () -> + lifecycleTrackers.values().stream() + .anyMatch( + tracker -> + isMatchingTrackerWithState( + tracker, + namespace, + resourceName, + state)) + ? 1 + : 0); + } } } + private static boolean isMatchingTrackerWithState( + ResourceLifecycleMetricTracker tracker, + String namespace, + ResourceLifecycleState state) { + return tracker.getNamespace().equals(namespace) && tracker.getCurrentState() == state; + } + + private static boolean isMatchingTrackerWithState( + ResourceLifecycleMetricTracker tracker, + String namespace, + String resourceName, + ResourceLifecycleState state) { + return isMatchingTrackerWithState(tracker, namespace, state) + && tracker.getResourceName().equals(resourceName); + } + private synchronized void init(CR cr) { if (transitionMetrics != null) { return; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java index deec24f944..e4298a280d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java @@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.metrics.Histogram; +import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,8 @@ public class ResourceLifecycleMetricTracker { private static final Logger LOG = LoggerFactory.getLogger(ResourceLifecycleMetricTracker.class); + @Getter private final String namespace; + @Getter private final String resourceName; private final Map> transitionHistos; private final Map> stateTimeHistos; @@ -45,10 +48,14 @@ public class ResourceLifecycleMetricTracker { private ResourceLifecycleState currentState; public ResourceLifecycleMetricTracker( + String namespace, + String resourceName, ResourceLifecycleState initialState, Instant time, Map> transitionHistos, Map> stateTimeHistos) { + this.namespace = namespace; + this.resourceName = resourceName; this.transitionHistos = transitionHistos; this.currentState = initialState; this.stateTimeHistos = stateTimeHistos; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java index 1776cad2e0..31db106ef5 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.metrics; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; @@ -36,9 +37,11 @@ import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.CPU_NAME; import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_MINOR_VERSION_GROUP_NAME; import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_VERSION_GROUP_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.IN_STATUS_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.JM_DEPLOYMENT_STATUS_GROUP_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.JOB_STATUS_GROUP_NAME; import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.MEMORY_NAME; import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.RESOURCE_USAGE_GROUP_NAME; -import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATUS_GROUP_NAME; import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -67,18 +70,33 @@ public void testMetricsSameNamespace() { var deployment1 = TestUtils.buildApplicationCluster("deployment1", namespace); var deployment2 = TestUtils.buildApplicationCluster("deployment2", namespace); - var counterId = - listener.getNamespaceMetricId(FlinkDeployment.class, namespace, COUNTER_NAME); + var counterId = getMetricIdForTotalDeploymentsInNamespace(namespace); assertTrue(listener.getGauge(counterId).isEmpty()); for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) { - var statusId = - listener.getNamespaceMetricId( - FlinkDeployment.class, - namespace, - STATUS_GROUP_NAME, - status.name(), - COUNTER_NAME); + var namespaceStatusId = + getMetricIdForTotalJmDeploymentsWithStatusInNamespace(namespace, status); + assertTrue(listener.getGauge(namespaceStatusId).isEmpty()); + + var deployment1StatusId = + getMetricIdForJmDeploymentStatusTrackerGauge(namespace, deployment1, status); + assertTrue(listener.getGauge(deployment1StatusId).isEmpty()); + + var deployment2StatusId = + getMetricIdForJmDeploymentStatusTrackerGauge(namespace, deployment2, status); + assertTrue(listener.getGauge(deployment2StatusId).isEmpty()); + } + + for (JobStatus status : JobStatus.values()) { + var statusId = getMetricIdForTotalJobsWithStatusInNamespace(namespace, status); assertTrue(listener.getGauge(statusId).isEmpty()); + + var deployment1StatusId = + getMetricIdForJobStatusTrackerGauge(namespace, deployment1, status); + assertTrue(listener.getGauge(deployment1StatusId).isEmpty()); + + var deployment2StatusId = + getMetricIdForJobStatusTrackerGauge(namespace, deployment2, status); + assertTrue(listener.getGauge(deployment2StatusId).isEmpty()); } metricManager.onUpdate(deployment1); @@ -90,31 +108,114 @@ public void testMetricsSameNamespace() { metricManager.onUpdate(deployment1); metricManager.onUpdate(deployment2); - var statusId = - listener.getNamespaceMetricId( - FlinkDeployment.class, - namespace, - STATUS_GROUP_NAME, - status.name(), - COUNTER_NAME); + var statusId = getMetricIdForTotalJmDeploymentsWithStatusInNamespace(namespace, status); + assertEquals(2, listener.getGauge(statusId).get().getValue()); + + var deployment1StatusId = + getMetricIdForJmDeploymentStatusTrackerGauge(namespace, deployment1, status); + assertEquals(1, listener.getGauge(deployment1StatusId).get().getValue()); + + var deployment2StatusId = + getMetricIdForJmDeploymentStatusTrackerGauge(namespace, deployment2, status); + assertEquals(1, listener.getGauge(deployment2StatusId).get().getValue()); + } + + for (JobStatus status : JobStatus.values()) { + deployment1.getStatus().getJobStatus().setState(status); + deployment2.getStatus().getJobStatus().setState(status); + metricManager.onUpdate(deployment1); + metricManager.onUpdate(deployment2); + + var statusId = getMetricIdForTotalJobsWithStatusInNamespace(namespace, status); assertEquals(2, listener.getGauge(statusId).get().getValue()); + + var deployment1StatusId = + getMetricIdForJobStatusTrackerGauge(namespace, deployment1, status); + assertEquals(1, listener.getGauge(deployment1StatusId).get().getValue()); + + var deployment2StatusId = + getMetricIdForJobStatusTrackerGauge(namespace, deployment2, status); + assertEquals(1, listener.getGauge(deployment2StatusId).get().getValue()); } metricManager.onRemove(deployment1); metricManager.onRemove(deployment2); assertEquals(0, listener.getGauge(counterId).get().getValue()); for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) { - var statusId = - listener.getNamespaceMetricId( - FlinkDeployment.class, - namespace, - STATUS_GROUP_NAME, - status.name(), - COUNTER_NAME); + var statusId = getMetricIdForTotalJmDeploymentsWithStatusInNamespace(namespace, status); + assertEquals(0, listener.getGauge(statusId).get().getValue()); + + var deployment1StatusId = + getMetricIdForJmDeploymentStatusTrackerGauge(namespace, deployment1, status); + assertEquals(0, listener.getGauge(deployment1StatusId).get().getValue()); + + var deployment2StatusId = + getMetricIdForJmDeploymentStatusTrackerGauge(namespace, deployment2, status); + assertEquals(0, listener.getGauge(deployment2StatusId).get().getValue()); + } + + for (JobStatus status : JobStatus.values()) { + var statusId = getMetricIdForTotalJobsWithStatusInNamespace(namespace, status); assertEquals(0, listener.getGauge(statusId).get().getValue()); + + var deployment1StatusId = + getMetricIdForJobStatusTrackerGauge(namespace, deployment1, status); + assertEquals(0, listener.getGauge(deployment1StatusId).get().getValue()); + + var deployment2StatusId = + getMetricIdForJobStatusTrackerGauge(namespace, deployment2, status); + assertEquals(0, listener.getGauge(deployment2StatusId).get().getValue()); } } + private String getMetricIdForTotalDeploymentsInNamespace(String namespace) { + return listener.getNamespaceMetricId(FlinkDeployment.class, namespace, COUNTER_NAME); + } + + private String getMetricIdForTotalJmDeploymentsWithStatusInNamespace( + String namespace, JobManagerDeploymentStatus status) { + return listener.getNamespaceMetricId( + FlinkDeployment.class, + namespace, + JM_DEPLOYMENT_STATUS_GROUP_NAME, + status.name(), + COUNTER_NAME); + } + + private String getMetricIdForTotalJobsWithStatusInNamespace( + String namespace, JobStatus status) { + return listener.getNamespaceMetricId( + FlinkDeployment.class, + namespace, + JOB_STATUS_GROUP_NAME, + status.name(), + COUNTER_NAME); + } + + private String getMetricIdForJmDeploymentStatusTrackerGauge( + String namespace, FlinkDeployment deployment, JobManagerDeploymentStatus status) { + var deploymentName = deployment.getMetadata().getName(); + return listener.getResourceMetricId( + FlinkDeployment.class, + namespace, + deploymentName, + JM_DEPLOYMENT_STATUS_GROUP_NAME, + status.name(), + IN_STATUS_NAME); + } + + private String getMetricIdForJobStatusTrackerGauge( + String namespace, FlinkDeployment deployment, JobStatus status) { + var deploymentName = deployment.getMetadata().getName(); + return listener.getResourceMetricId( + FlinkDeployment.class, + namespace, + deploymentName, + JOB_STATUS_GROUP_NAME, + status.name(), + IN_STATUS_NAME); + } + @Test public void testMetricsMultiNamespace() { var namespace1 = "ns1"; @@ -122,10 +223,8 @@ public void testMetricsMultiNamespace() { var deployment1 = TestUtils.buildApplicationCluster("deployment", namespace1); var deployment2 = TestUtils.buildApplicationCluster("deployment", namespace2); - var counterId1 = - listener.getNamespaceMetricId(FlinkDeployment.class, namespace1, COUNTER_NAME); - var counterId2 = - listener.getNamespaceMetricId(FlinkDeployment.class, namespace2, COUNTER_NAME); + var counterId1 = getMetricIdForTotalDeploymentsInNamespace(namespace1); + var counterId2 = getMetricIdForTotalDeploymentsInNamespace(namespace2); assertTrue(listener.getGauge(counterId1).isEmpty()); assertTrue(listener.getGauge(counterId2).isEmpty()); var stateCounter1 = @@ -139,7 +238,7 @@ public void testMetricsMultiNamespace() { var stateCounter2 = listener.getNamespaceMetricId( FlinkDeployment.class, - namespace1, + namespace2, "Lifecycle", "State", ResourceLifecycleState.CREATED.name(), @@ -148,21 +247,19 @@ public void testMetricsMultiNamespace() { assertTrue(listener.getGauge(stateCounter2).isEmpty()); for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) { var statusId1 = - listener.getNamespaceMetricId( - FlinkDeployment.class, - namespace1, - STATUS_GROUP_NAME, - status.name(), - COUNTER_NAME); + getMetricIdForTotalJmDeploymentsWithStatusInNamespace(namespace1, status); var statusId2 = - listener.getNamespaceMetricId( - FlinkDeployment.class, - namespace2, - STATUS_GROUP_NAME, - status.name(), - COUNTER_NAME); + getMetricIdForTotalJmDeploymentsWithStatusInNamespace(namespace2, status); assertTrue(listener.getGauge(statusId1).isEmpty()); assertTrue(listener.getGauge(statusId2).isEmpty()); + + var deployment1StatusId = + getMetricIdForJmDeploymentStatusTrackerGauge(namespace1, deployment1, status); + assertTrue(listener.getGauge(deployment1StatusId).isEmpty()); + + var deployment2StatusId = + getMetricIdForJmDeploymentStatusTrackerGauge(namespace2, deployment2, status); + assertTrue(listener.getGauge(deployment2StatusId).isEmpty()); } metricManager.onUpdate(deployment1); @@ -177,21 +274,19 @@ public void testMetricsMultiNamespace() { metricManager.onUpdate(deployment1); metricManager.onUpdate(deployment2); var statusId1 = - listener.getNamespaceMetricId( - FlinkDeployment.class, - namespace1, - STATUS_GROUP_NAME, - status.name(), - COUNTER_NAME); + getMetricIdForTotalJmDeploymentsWithStatusInNamespace(namespace1, status); var statusId2 = - listener.getNamespaceMetricId( - FlinkDeployment.class, - namespace2, - STATUS_GROUP_NAME, - status.name(), - COUNTER_NAME); + getMetricIdForTotalJmDeploymentsWithStatusInNamespace(namespace2, status); assertEquals(1, listener.getGauge(statusId1).get().getValue()); assertEquals(1, listener.getGauge(statusId2).get().getValue()); + + var deployment1StatusId = + getMetricIdForJmDeploymentStatusTrackerGauge(namespace1, deployment1, status); + assertEquals(1, listener.getGauge(deployment1StatusId).get().getValue()); + + var deployment2StatusId = + getMetricIdForJmDeploymentStatusTrackerGauge(namespace2, deployment2, status); + assertEquals(1, listener.getGauge(deployment2StatusId).get().getValue()); } metricManager.onRemove(deployment1); @@ -205,21 +300,19 @@ public void testMetricsMultiNamespace() { deployment1.getStatus().setJobManagerDeploymentStatus(status); deployment2.getStatus().setJobManagerDeploymentStatus(status); var statusId1 = - listener.getNamespaceMetricId( - FlinkDeployment.class, - namespace1, - STATUS_GROUP_NAME, - status.name(), - COUNTER_NAME); + getMetricIdForTotalJmDeploymentsWithStatusInNamespace(namespace1, status); var statusId2 = - listener.getNamespaceMetricId( - FlinkDeployment.class, - namespace2, - STATUS_GROUP_NAME, - status.name(), - COUNTER_NAME); + getMetricIdForTotalJmDeploymentsWithStatusInNamespace(namespace2, status); assertEquals(0, listener.getGauge(statusId1).get().getValue()); assertEquals(0, listener.getGauge(statusId2).get().getValue()); + + var deployment1StatusId = + getMetricIdForJmDeploymentStatusTrackerGauge(namespace1, deployment1, status); + assertEquals(0, listener.getGauge(deployment1StatusId).get().getValue()); + + var deployment2StatusId = + getMetricIdForJmDeploymentStatusTrackerGauge(namespace2, deployment2, status); + assertEquals(0, listener.getGauge(deployment2StatusId).get().getValue()); } } @@ -477,7 +570,7 @@ public void testMetricsDisabled() { listener.getNamespaceMetricId( FlinkDeployment.class, namespace, - STATUS_GROUP_NAME, + JM_DEPLOYMENT_STATUS_GROUP_NAME, status.name(), COUNTER_NAME); assertTrue(listener.getGauge(statusId).isEmpty()); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java index 208c446f26..b31b62f6b1 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java @@ -44,6 +44,11 @@ public class TestingMetricListener { private static final String HOST = "test-op-host"; private final KubernetesOperatorMetricGroup metricGroup; private final Map metrics = new HashMap(); + + public Map getMetrics() { + return metrics; + } + private final ScheduledExecutorService executor; private Configuration configuration; private ViewUpdater viewUpdater; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java index a399d871cc..6edb63453c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java @@ -62,23 +62,29 @@ public class ResourceLifecycleMetricsTest { @Test public void lifecycleStateTest() { + Configuration configuration = new Configuration(); + TestingMetricListener listener = new TestingMetricListener(configuration); + MetricManager metricManager = + MetricManager.createFlinkDeploymentMetricManager( + configuration, listener.getMetricGroup()); + var application = TestUtils.buildApplicationCluster(); - assertEquals(CREATED, application.getStatus().getLifecycleState()); + assertAppInExpectedState(application, CREATED, metricManager, listener); - ReconciliationUtils.updateStatusBeforeDeploymentAttempt(application, new Configuration()); - assertEquals(UPGRADING, application.getStatus().getLifecycleState()); + ReconciliationUtils.updateStatusBeforeDeploymentAttempt(application, configuration); + assertAppInExpectedState(application, UPGRADING, metricManager, listener); - ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration()); - assertEquals(DEPLOYED, application.getStatus().getLifecycleState()); + ReconciliationUtils.updateStatusForDeployedSpec(application, configuration); + assertAppInExpectedState(application, DEPLOYED, metricManager, listener); application.getStatus().getReconciliationStatus().markReconciledSpecAsStable(); - assertEquals(STABLE, application.getStatus().getLifecycleState()); + assertAppInExpectedState(application, STABLE, metricManager, listener); application.getStatus().setError("errr"); - assertEquals(STABLE, application.getStatus().getLifecycleState()); + assertAppInExpectedState(application, STABLE, metricManager, listener); application.getStatus().getJobStatus().setState(JobStatus.FAILED); - assertEquals(FAILED, application.getStatus().getLifecycleState()); + assertAppInExpectedState(application, FAILED, metricManager, listener); application.getStatus().setError(null); @@ -86,19 +92,45 @@ public void lifecycleStateTest() { .getStatus() .getReconciliationStatus() .setState(ReconciliationState.ROLLING_BACK); - assertEquals(ROLLING_BACK, application.getStatus().getLifecycleState()); + assertAppInExpectedState(application, ROLLING_BACK, metricManager, listener); application.getStatus().getJobStatus().setState(JobStatus.RECONCILING); application.getStatus().getReconciliationStatus().setState(ReconciliationState.ROLLED_BACK); - assertEquals(ROLLED_BACK, application.getStatus().getLifecycleState()); + assertAppInExpectedState(application, ROLLED_BACK, metricManager, listener); application.getStatus().getJobStatus().setState(JobStatus.FAILED); - assertEquals(FAILED, application.getStatus().getLifecycleState()); + assertAppInExpectedState(application, FAILED, metricManager, listener); application.getStatus().getJobStatus().setState(JobStatus.RUNNING); application.getSpec().getJob().setState(JobState.SUSPENDED); - ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration()); - assertEquals(SUSPENDED, application.getStatus().getLifecycleState()); + ReconciliationUtils.updateStatusForDeployedSpec(application, configuration); + assertAppInExpectedState(application, SUSPENDED, metricManager, listener); + } + + private static void assertAppInExpectedState( + FlinkDeployment application, + ResourceLifecycleState expectedState, + MetricManager metricManager, + TestingMetricListener listener) { + assertEquals(expectedState, application.getStatus().getLifecycleState()); + + metricManager.onUpdate(application); + + for (var candidateState : ResourceLifecycleState.values()) { + var stateGaugeMetricId = + listener.getResourceMetricId( + FlinkDeployment.class, + application.getMetadata().getNamespace(), + application.getMetadata().getName(), + "Lifecycle", + "State", + candidateState.name(), + "InState"); + + var expectedGaugeValue = candidateState == expectedState ? 1 : 0; + assertEquals( + expectedGaugeValue, listener.getGauge(stateGaugeMetricId).get().getValue()); + } } @Test @@ -108,7 +140,7 @@ public void testLifecycleTracker() { var lifecycleTracker = new ResourceLifecycleMetricTracker( - CREATED, Instant.ofEpochMilli(0), transitionHistos, timeHistos); + "ns", "n1", CREATED, Instant.ofEpochMilli(0), transitionHistos, timeHistos); long ts = 1000; lifecycleTracker.onUpdate(CREATED, Instant.ofEpochMilli(ts)); @@ -233,6 +265,7 @@ public void testLifecycleMetricsConfig() { public void testGlobalHistoNames() { var conf = new Configuration(); var testingMetricListener = new TestingMetricListener(new Configuration()); + var deploymentMetricManager = MetricManager.createFlinkDeploymentMetricManager( conf, testingMetricListener.getMetricGroup());