Skip to content

Commit 9344765

Browse files
committed
Update CompositeHealthIndicator to be able to run HealthIndicators concurrently.
1 parent 0d32481 commit 9344765

File tree

2 files changed

+256
-4
lines changed

2 files changed

+256
-4
lines changed

spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java

+124-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@
1818

1919
import java.util.LinkedHashMap;
2020
import java.util.Map;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.ExecutionException;
23+
import java.util.concurrent.Executor;
24+
import java.util.concurrent.Future;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.TimeoutException;
27+
28+
import org.springframework.util.Assert;
2129

2230
/**
2331
* {@link HealthIndicator} that returns health indications from all registered delegates.
@@ -33,6 +41,8 @@ public class CompositeHealthIndicator implements HealthIndicator {
3341

3442
private final HealthAggregator aggregator;
3543

44+
private final HealthStrategy strategy;
45+
3646
/**
3747
* Create a new {@link CompositeHealthIndicator} from the specified indicators.
3848
* @param healthAggregator the health aggregator
@@ -50,8 +60,14 @@ public CompositeHealthIndicator(HealthAggregator healthAggregator, Map<String, H
5060
* @param registry the registry of {@link HealthIndicator HealthIndicators}.
5161
*/
5262
public CompositeHealthIndicator(HealthAggregator healthAggregator, HealthIndicatorRegistry registry) {
63+
this(healthAggregator, registry, new SequentialStrategy());
64+
}
65+
66+
private CompositeHealthIndicator(HealthAggregator healthAggregator, HealthIndicatorRegistry registry,
67+
HealthStrategy strategy) {
5368
this.aggregator = healthAggregator;
5469
this.registry = registry;
70+
this.strategy = strategy;
5571
}
5672

5773
/**
@@ -63,13 +79,117 @@ public HealthIndicatorRegistry getRegistry() {
6379
return this.registry;
6480
}
6581

82+
/**
83+
* Returns a new {@link CompositeHealthIndicator} with a parallel strategy that
84+
* returns health indications from all registered delegates concurrently.
85+
* @param timeout number of milliseconds to wait before using the
86+
* {@code timeoutHealth}
87+
* @param timeoutHealth the {@link Health} to use if an health indicator reached the
88+
* {@code timeout}. Defaults to {@code unknown} status.
89+
* @param executor the executor to submit {@link HealthIndicator HealthIndicators} on.
90+
* @return new instance with a parallel strategy
91+
* @since 2.2.0
92+
*/
93+
public CompositeHealthIndicator parallel(Executor executor, long timeout, Health timeoutHealth) {
94+
Assert.notNull(executor, "Executor must not be null");
95+
ParallelStrategy strategy = new ParallelStrategy(executor, timeout, timeoutHealth);
96+
return new CompositeHealthIndicator(this.aggregator, this.registry, strategy);
97+
}
98+
99+
/**
100+
* Returns a new {@link CompositeHealthIndicator} with a parallel strategy that
101+
* returns health indications from all registered delegates concurrently.
102+
* @param executor the executor to submit {@link HealthIndicator HealthIndicators} on.
103+
* @return new instance with a parallel strategy
104+
* @since 2.2.0
105+
*/
106+
public CompositeHealthIndicator parallel(Executor executor) {
107+
Assert.notNull(executor, "Executor must not be null");
108+
ParallelStrategy strategy = new ParallelStrategy(executor, null, null);
109+
return new CompositeHealthIndicator(this.aggregator, this.registry, strategy);
110+
}
111+
112+
/**
113+
* Returns a new {@link CompositeHealthIndicator} with a sequential strategy that
114+
* returns health indications from all registered delegates sequentially.
115+
* @return new instance with a sequential strategy
116+
* @since 2.2.0
117+
*/
118+
public CompositeHealthIndicator sequential() {
119+
return new CompositeHealthIndicator(this.aggregator, this.registry, new SequentialStrategy());
120+
}
121+
66122
@Override
67123
public Health health() {
68-
Map<String, Health> healths = new LinkedHashMap<>();
69-
for (Map.Entry<String, HealthIndicator> entry : this.registry.getAll().entrySet()) {
70-
healths.put(entry.getKey(), entry.getValue().health());
71-
}
124+
Map<String, Health> healths = this.strategy.doHealth(this.registry.getAll());
72125
return this.aggregator.aggregate(healths);
73126
}
74127

128+
@FunctionalInterface
129+
private interface HealthStrategy {
130+
131+
Map<String, Health> doHealth(Map<String, HealthIndicator> healthIndicators);
132+
133+
}
134+
135+
private static final class SequentialStrategy implements HealthStrategy {
136+
137+
@Override
138+
public Map<String, Health> doHealth(Map<String, HealthIndicator> healthIndicators) {
139+
Map<String, Health> healths = new LinkedHashMap<>();
140+
for (Map.Entry<String, HealthIndicator> entry : healthIndicators.entrySet()) {
141+
healths.put(entry.getKey(), entry.getValue().health());
142+
}
143+
return healths;
144+
}
145+
146+
}
147+
148+
private static final class ParallelStrategy implements HealthStrategy {
149+
150+
private final Executor executor;
151+
152+
private final Long timeout;
153+
154+
private final Health timeoutHealth;
155+
156+
private ParallelStrategy(Executor executor, Long timeout, Health timeoutHealth) {
157+
this.executor = executor;
158+
this.timeout = timeout;
159+
this.timeoutHealth = (timeoutHealth != null) ? timeoutHealth : Health.unknown().build();
160+
}
161+
162+
@Override
163+
public Map<String, Health> doHealth(Map<String, HealthIndicator> healthIndicators) {
164+
Map<String, Future<Health>> healthsFutures = new LinkedHashMap<>();
165+
for (Map.Entry<String, HealthIndicator> entry : healthIndicators.entrySet()) {
166+
healthsFutures.put(entry.getKey(),
167+
CompletableFuture.supplyAsync(entry.getValue()::health, this.executor));
168+
}
169+
Map<String, Health> healths = new LinkedHashMap<>();
170+
for (Map.Entry<String, Future<Health>> entry : healthsFutures.entrySet()) {
171+
healths.put(entry.getKey(), getHealth(entry.getValue(), this.timeout, this.timeoutHealth));
172+
}
173+
return healths;
174+
}
175+
176+
private static Health getHealth(Future<Health> healthFuture, Long timeout, Health timeoutHealth) {
177+
try {
178+
return (timeout != null) ? healthFuture.get(timeout, TimeUnit.MILLISECONDS) : healthFuture.get();
179+
}
180+
catch (InterruptedException ex) {
181+
Thread.currentThread().interrupt();
182+
return Health.unknown().withException(ex).build();
183+
}
184+
catch (TimeoutException ex) {
185+
return timeoutHealth;
186+
}
187+
catch (ExecutionException ex) {
188+
Throwable cause = ex.getCause();
189+
return Health.down((cause instanceof Exception) ? ((Exception) cause) : ex).build();
190+
}
191+
}
192+
193+
}
194+
75195
}

spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/health/CompositeHealthIndicatorTests.java

+132
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,22 @@
1919
import java.util.Collections;
2020
import java.util.HashMap;
2121
import java.util.Map;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.ScheduledExecutorService;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicReference;
2227

2328
import com.fasterxml.jackson.databind.ObjectMapper;
29+
import org.junit.jupiter.api.AfterAll;
2430
import org.junit.jupiter.api.BeforeEach;
2531
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.Timeout;
2633
import org.mockito.Mock;
2734
import org.mockito.MockitoAnnotations;
2835

36+
import org.springframework.util.StopWatch;
37+
2938
import static org.assertj.core.api.Assertions.assertThat;
3039
import static org.mockito.BDDMockito.given;
3140

@@ -38,6 +47,8 @@
3847
*/
3948
class CompositeHealthIndicatorTests {
4049

50+
private static final ExecutorService executor = Executors.newCachedThreadPool();
51+
4152
private HealthAggregator healthAggregator;
4253

4354
@Mock
@@ -55,6 +66,11 @@ void setup() {
5566
this.healthAggregator = new OrderedHealthAggregator();
5667
}
5768

69+
@AfterAll
70+
static void shutdownExecutor() {
71+
executor.shutdown();
72+
}
73+
5874
@Test
5975
void createWithIndicators() {
6076
Map<String, HealthIndicator> indicators = new HashMap<>();
@@ -85,4 +101,120 @@ void testSerialization() throws Exception {
85101
+ ":{\"1\":\"1\"}},\"db2\":{\"status\":\"UNKNOWN\",\"details\":{\"2\":\"2\"}}}}}}");
86102
}
87103

104+
@Test
105+
void testWithSequentialStrategy() {
106+
Map<String, HealthIndicator> indicators = new HashMap<>();
107+
indicators.put("slow-1", new TimeoutHealth(200, Status.UP));
108+
indicators.put("slow-2", new TimeoutHealth(300, Status.UP));
109+
CompositeHealthIndicator indicator = new CompositeHealthIndicator(this.healthAggregator, indicators)
110+
.sequential();
111+
StopWatch watch = new StopWatch();
112+
watch.start();
113+
Health health = indicator.health();
114+
watch.stop();
115+
assertThat(watch.getLastTaskTimeMillis()).isBetween(500L, 750L);
116+
assertThat(health.getStatus()).isEqualTo(Status.UP);
117+
assertThat(health.getDetails()).containsOnlyKeys("slow-1", "slow-2");
118+
assertThat(health.getDetails().get("slow-1")).isEqualTo(Health.up().build());
119+
assertThat(health.getDetails().get("slow-2")).isEqualTo(Health.up().build());
120+
}
121+
122+
@Test
123+
@Timeout(1)
124+
void testWithParallelStrategy() {
125+
Map<String, HealthIndicator> indicators = new HashMap<>();
126+
indicators.put("slow-1", new TimeoutHealth(300, Status.UP));
127+
indicators.put("slow-2", new TimeoutHealth(800, Status.UP));
128+
IllegalStateException ex = new IllegalStateException("No Connection");
129+
indicators.put("error", () -> {
130+
throw ex;
131+
});
132+
CompositeHealthIndicator indicator = new CompositeHealthIndicator(this.healthAggregator, indicators)
133+
.parallel(executor);
134+
Health health = indicator.health();
135+
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
136+
assertThat(health.getDetails()).containsOnlyKeys("slow-1", "slow-2", "error");
137+
assertThat(health.getDetails().get("slow-1")).isEqualTo(Health.up().build());
138+
assertThat(health.getDetails().get("slow-2")).isEqualTo(Health.up().build());
139+
assertThat(health.getDetails().get("error")).isEqualTo(Health.down(ex).build());
140+
}
141+
142+
@Test
143+
void testWithParallelStrategyTimeoutReached() {
144+
Map<String, HealthIndicator> indicators = new HashMap<>();
145+
indicators.put("slow", new TimeoutHealth(250, Status.UP));
146+
indicators.put("fast", new TimeoutHealth(10, Status.UP));
147+
CompositeHealthIndicator indicator = new CompositeHealthIndicator(this.healthAggregator, indicators)
148+
.parallel(executor, 200, null);
149+
Health health = indicator.health();
150+
assertThat(health.getStatus()).isEqualTo(Status.UP);
151+
assertThat(health.getDetails()).containsOnlyKeys("slow", "fast");
152+
assertThat(health.getDetails().get("slow")).isEqualTo(Health.unknown().build());
153+
assertThat(health.getDetails().get("fast")).isEqualTo(Health.up().build());
154+
}
155+
156+
@Test
157+
void testWithParallelStrategyTimeoutReachedCustomTimeoutFallback() {
158+
Map<String, HealthIndicator> indicators = new HashMap<>();
159+
indicators.put("slow", new TimeoutHealth(250, Status.UP));
160+
indicators.put("fast", new TimeoutHealth(10, Status.UP));
161+
CompositeHealthIndicator indicator = new CompositeHealthIndicator(this.healthAggregator, indicators)
162+
.parallel(executor, 200, Health.down().build());
163+
Health health = indicator.health();
164+
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
165+
assertThat(health.getDetails()).containsOnlyKeys("slow", "fast");
166+
assertThat(health.getDetails().get("slow")).isEqualTo(Health.down().build());
167+
assertThat(health.getDetails().get("fast")).isEqualTo(Health.up().build());
168+
}
169+
170+
@Test
171+
void testWithParallelStrategyInterrupted() throws InterruptedException {
172+
Map<String, HealthIndicator> indicators = new HashMap<>();
173+
indicators.put("slow", new TimeoutHealth(800, Status.UP));
174+
indicators.put("fast", new TimeoutHealth(300, Status.UP));
175+
InterruptedException ex = new InterruptedException();
176+
CompositeHealthIndicator indicator = new CompositeHealthIndicator(this.healthAggregator, indicators)
177+
.parallel(executor);
178+
AtomicReference<Health> healthReference = new AtomicReference<>();
179+
Thread thread = new Thread(() -> healthReference.set(indicator.health()));
180+
thread.start();
181+
thread.join(100);
182+
thread.interrupt();
183+
thread.join();
184+
Health health = healthReference.get();
185+
assertThat(health.getStatus()).isEqualTo(Status.UNKNOWN);
186+
assertThat(health.getDetails()).containsOnlyKeys("slow", "fast");
187+
assertThat(health.getDetails().get("slow")).isEqualTo(Health.unknown().withException(ex).build());
188+
assertThat(health.getDetails().get("fast")).isEqualTo(Health.unknown().withException(ex).build());
189+
}
190+
191+
private static final class TimeoutHealth implements HealthIndicator {
192+
193+
private final long timeout;
194+
195+
private final Status status;
196+
197+
TimeoutHealth(long timeout, Status status) {
198+
this.timeout = timeout;
199+
this.status = status;
200+
}
201+
202+
@Override
203+
public Health health() {
204+
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
205+
try {
206+
return executorService
207+
.schedule(() -> Health.status(this.status).build(), this.timeout, TimeUnit.MILLISECONDS).get();
208+
}
209+
catch (Exception ex) {
210+
// never
211+
throw new RuntimeException(ex);
212+
}
213+
finally {
214+
executorService.shutdown();
215+
}
216+
}
217+
218+
}
219+
88220
}

0 commit comments

Comments
 (0)