Skip to content

Commit cbedae2

Browse files
Allow refetching instances for healthcheck (#855)
* Allow refetching instances by HealthCheckServiceInstanceListSupplier. * Add docs and javadocs. * Fix docs after review.
1 parent 7287a3c commit cbedae2

File tree

5 files changed

+135
-42
lines changed

5 files changed

+135
-42
lines changed

docs/src/main/asciidoc/_configprops.adoc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,6 @@
1010
|spring.cloud.discovery.client.health-indicator.enabled | true |
1111
|spring.cloud.discovery.client.health-indicator.include-description | false |
1212
|spring.cloud.discovery.client.simple.instances | |
13-
|spring.cloud.discovery.client.simple.local.instance-id | | The unique identifier or name for the service instance.
14-
|spring.cloud.discovery.client.simple.local.metadata | | Metadata for the service instance. Can be used by discovery clients to modify their behaviour per instance, e.g. when load balancing.
15-
|spring.cloud.discovery.client.simple.local.service-id | | The identifier or name for the service. Multiple instances might share the same service ID.
16-
|spring.cloud.discovery.client.simple.local.uri | | The URI of the service instance. Will be parsed to extract the scheme, host, and port.
1713
|spring.cloud.discovery.client.simple.order | |
1814
|spring.cloud.discovery.enabled | true | Enables discovery client health indicators.
1915
|spring.cloud.features.enabled | true | Enables the features endpoint.
@@ -34,6 +30,9 @@
3430
|spring.cloud.loadbalancer.health-check.initial-delay | 0 | Initial delay value for the HealthCheck scheduler.
3531
|spring.cloud.loadbalancer.health-check.interval | 25s | Interval for rerunning the HealthCheck scheduler.
3632
|spring.cloud.loadbalancer.health-check.path | |
33+
|spring.cloud.loadbalancer.health-check.refetch-instances | false | Indicates whether the instances should be refetched by the <code>HealthCheckServiceInstanceListSupplier</code>. This can be used if the instances can be updated and the underlying delegate does not provide an ongoing flux.
34+
|spring.cloud.loadbalancer.health-check.refetch-instances-interval | 25s | Interval for refetching available service instances.
35+
|spring.cloud.loadbalancer.health-check.repeat-health-check | true | Indicates whether health checks should keep repeating. It might be useful to set it to <code>false</code> if periodically refetching the instances, as every refetch will also trigger a healthcheck.
3736
|spring.cloud.loadbalancer.retry.enabled | true |
3837
|spring.cloud.loadbalancer.retry.max-retries-on-next-service-instance | 1 | Number of retries to be executed on the next <code>ServiceInstance</code>. A <code>ServiceInstance</code> is chosen before each retry call.
3938
|spring.cloud.loadbalancer.retry.max-retries-on-same-service-instance | 0 | Number of retries to be executed on the same <code>ServiceInstance</code>.

docs/src/main/asciidoc/spring-cloud-commons.adoc

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -953,16 +953,22 @@ TIP: This mechanism is particularly helpful while using the `SimpleDiscoveryClie
953953
clients backed by an actual Service Registry, it's not necessary to use, as we already get
954954
healthy instances after querying the external ServiceDiscovery.
955955

956-
TIP:: This supplier is also recommended for setups with a small number of instances per service
956+
TIP: This supplier is also recommended for setups with a small number of instances per service
957957
in order to avoid retrying calls on a failing instance.
958958

959+
WARNING: If using any of the Service Discovery-backed suppliers, adding this health-check mechanism is usually not necessary, as we retrieve the health state of the instances directly
960+
from the Service Registry.
961+
962+
TIP: The `HealthCheckServiceInstanceListSupplier` relies on having updated instances provided by a delegate flux. In the rare cases when you want to use a delegate that does not refresh the instances, even though the list of instances may change (such as the `ReactiveDiscoveryClientServiceInstanceListSupplier` provided by us), you can set `spring.cloud.loadbalancer.health-check.refetch-instances` to `true` to have the instance list refreshed by the `HealthCheckServiceInstanceListSupplier`. You can then also adjust the refretch intervals by modifying the value of `spring.cloud.loadbalancer.health-check.refetch-instances-interval` and opt to disable the additional healthcheck repetitions by setting `spring.cloud.loadbalancer.repeat-health-check` to `fasle` as every instances refetch
963+
will also trigger a healthcheck.
964+
959965
`HealthCheckServiceInstanceListSupplier` uses properties prefixed with
960966
`spring.cloud.loadbalancer.health-check`. You can set the `initialDelay` and `interval`
961967
for the scheduler. You can set the default path for the healthcheck URL by setting
962968
the value of the `spring.cloud.loadbalancer.health-check.path.default`. You can also set a specific value
963969
for any given service by setting the value of the `spring.cloud.loadbalancer.health-check.path.[SERVICE_ID]`, substituting the `[SERVICE_ID]` with the correct ID of your service. If the path is not set, `/actuator/health` is used by default.
964970

965-
TIP:: If you rely on the default path (`/actuator/health`), make sure you add `spring-boot-starter-actuator` to your collaborator's dependencies, unless you are planning to add such an endpoint on your own.
971+
TIP: If you rely on the default path (`/actuator/health`), make sure you add `spring-boot-starter-actuator` to your collaborator's dependencies, unless you are planning to add such an endpoint on your own.
966972

967973
In order to use the health-check scheduler approach, you will have to instantiate a `HealthCheckServiceInstanceListSupplier` bean in a <<custom-loadbalancer-configuration,custom configuration>>.
968974

@@ -987,7 +993,7 @@ public class CustomLoadBalancerConfiguration {
987993
}
988994
----
989995

990-
NOTE:: `HealthCheckServiceInstanceListSupplier` has its own caching mechanism based on Reactor Flux `replay()`, therefore, if it's being used, you may want to skip wrapping that supplier with `CachingServiceInstanceListSupplier`.
996+
WARNING: `HealthCheckServiceInstanceListSupplier` has its own caching mechanism based on Reactor Flux `replay()`. Therefore, if it's being used, you may want to skip wrapping that supplier with `CachingServiceInstanceListSupplier`.
991997

992998

993999
[[spring-cloud-loadbalancer-starter]]

spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerProperties.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,44 @@ public static class HealthCheck {
5656
*/
5757
private Duration interval = Duration.ofSeconds(25);
5858

59+
/**
60+
* Interval for refetching available service instances.
61+
*/
62+
private Duration refetchInstancesInterval = Duration.ofSeconds(25);
63+
5964
private Map<String, String> path = new LinkedCaseInsensitiveMap<>();
6065

66+
/**
67+
* Indicates whether the instances should be refetched by the
68+
* <code>HealthCheckServiceInstanceListSupplier</code>. This can be used if the
69+
* instances can be updated and the underlying delegate does not provide an
70+
* ongoing flux.
71+
*/
72+
private boolean refetchInstances = false;
73+
74+
/**
75+
* Indicates whether health checks should keep repeating. It might be useful to
76+
* set it to <code>false</code> if periodically refetching the instances, as every
77+
* refetch will also trigger a healthcheck.
78+
*/
79+
private boolean repeatHealthCheck = true;
80+
81+
public boolean getRefetchInstances() {
82+
return refetchInstances;
83+
}
84+
85+
public void setRefetchInstances(boolean refetchInstances) {
86+
this.refetchInstances = refetchInstances;
87+
}
88+
89+
public boolean getRepeatHealthCheck() {
90+
return repeatHealthCheck;
91+
}
92+
93+
public void setRepeatHealthCheck(boolean repeatHealthCheck) {
94+
this.repeatHealthCheck = repeatHealthCheck;
95+
}
96+
6197
public int getInitialDelay() {
6298
return initialDelay;
6399
}
@@ -66,6 +102,14 @@ public void setInitialDelay(int initialDelay) {
66102
this.initialDelay = initialDelay;
67103
}
68104

105+
public Duration getRefetchInstancesInterval() {
106+
return refetchInstancesInterval;
107+
}
108+
109+
public void setRefetchInstancesInterval(Duration refetchInstancesInterval) {
110+
this.refetchInstancesInterval = refetchInstancesInterval;
111+
}
112+
69113
public Map<String, String> getPath() {
70114
return path;
71115
}

spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import reactor.core.Disposable;
2727
import reactor.core.publisher.Flux;
2828
import reactor.core.publisher.Mono;
29+
import reactor.retry.Repeat;
2930

3031
import org.springframework.beans.factory.DisposableBean;
3132
import org.springframework.beans.factory.InitializingBean;
@@ -64,14 +65,19 @@ public class HealthCheckServiceInstanceListSupplier
6465
public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
6566
LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) {
6667
super(delegate);
67-
this.healthCheck = healthCheck;
6868
defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default",
6969
"/actuator/health");
7070
this.webClient = webClient;
71-
aliveInstancesReplay = Flux.defer(delegate)
72-
.delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay()))
71+
this.healthCheck = healthCheck;
72+
Repeat<Object> aliveInstancesReplayRepeat = Repeat
73+
.onlyIf(repeatContext -> this.healthCheck.getRefetchInstances())
74+
.fixedBackoff(healthCheck.getRefetchInstancesInterval());
75+
Flux<List<ServiceInstance>> aliveInstancesFlux = Flux.defer(delegate)
7376
.switchMap(serviceInstances -> healthCheckFlux(serviceInstances).map(
7477
alive -> Collections.unmodifiableList(new ArrayList<>(alive))))
78+
.repeatWhen(aliveInstancesReplayRepeat);
79+
aliveInstancesReplay = aliveInstancesFlux
80+
.delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay()))
7581
.replay(1).refCount(1);
7682
}
7783

@@ -86,6 +92,9 @@ public void afterPropertiesSet() {
8692

8793
protected Flux<List<ServiceInstance>> healthCheckFlux(
8894
List<ServiceInstance> instances) {
95+
Repeat<Object> healthCheckFluxRepeat = Repeat
96+
.onlyIf(repeatContext -> healthCheck.getRepeatHealthCheck())
97+
.fixedBackoff(healthCheck.getInterval());
8998
return Flux.defer(() -> {
9099
List<Mono<ServiceInstance>> checks = new ArrayList<>(instances.size());
91100
for (ServiceInstance instance : instances) {
@@ -117,7 +126,7 @@ protected Flux<List<ServiceInstance>> healthCheckFlux(
117126
result.add(alive);
118127
return result;
119128
}).defaultIfEmpty(result);
120-
}).repeatWhen(restart -> restart.delayElements(healthCheck.getInterval()));
129+
}).repeatWhen(healthCheckFluxRepeat);
121130
}
122131

123132
@Override

spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java

Lines changed: 66 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.cloud.loadbalancer.core;
1818

1919
import java.time.Duration;
20+
import java.util.Collections;
2021
import java.util.List;
2122
import java.util.concurrent.atomic.AtomicBoolean;
2223
import java.util.concurrent.atomic.AtomicInteger;
@@ -47,6 +48,8 @@
4748
import org.springframework.web.reactive.function.client.WebClient;
4849

4950
import static org.assertj.core.api.Assertions.assertThat;
51+
import static org.mockito.Mockito.mock;
52+
import static org.mockito.Mockito.when;
5053

5154
/**
5255
* Tests for {@link HealthCheckServiceInstanceListSupplier}.
@@ -79,7 +82,7 @@ void setUp() {
7982
}
8083

8184
@AfterEach
82-
void tearDown() throws Exception {
85+
void tearDown() {
8386
if (listSupplier != null) {
8487
listSupplier.destroy();
8588
listSupplier = null;
@@ -140,14 +143,14 @@ void shouldReturnOnlyAliveService() {
140143
SERVICE_ID, "127.0.0.2", port, false);
141144

142145
StepVerifier.withVirtualTime(() -> {
143-
ServiceInstanceListSupplier delegate = Mockito
144-
.mock(ServiceInstanceListSupplier.class);
146+
ServiceInstanceListSupplier delegate = mock(
147+
ServiceInstanceListSupplier.class);
145148
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
146149
Mockito.when(delegate.get()).thenReturn(
147150
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));
148151

149-
HealthCheckServiceInstanceListSupplier mock = Mockito
150-
.mock(HealthCheckServiceInstanceListSupplier.class);
152+
HealthCheckServiceInstanceListSupplier mock = mock(
153+
HealthCheckServiceInstanceListSupplier.class);
151154
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance1);
152155
Mockito.doReturn(Mono.just(false)).when(mock).isAlive(serviceInstance2);
153156

@@ -176,14 +179,14 @@ void shouldEmitOnEachAliveServiceInBatch() {
176179
SERVICE_ID, "127.0.0.2", port, false);
177180

178181
StepVerifier.withVirtualTime(() -> {
179-
ServiceInstanceListSupplier delegate = Mockito
180-
.mock(ServiceInstanceListSupplier.class);
182+
ServiceInstanceListSupplier delegate = mock(
183+
ServiceInstanceListSupplier.class);
181184
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
182185
Mockito.when(delegate.get()).thenReturn(
183186
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));
184187

185-
HealthCheckServiceInstanceListSupplier mock = Mockito
186-
.mock(HealthCheckServiceInstanceListSupplier.class);
188+
HealthCheckServiceInstanceListSupplier mock = mock(
189+
HealthCheckServiceInstanceListSupplier.class);
187190
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance1);
188191
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance2);
189192

@@ -213,14 +216,14 @@ void shouldNotFailIfIsAliveReturnsError() {
213216
SERVICE_ID, "127.0.0.2", port, false);
214217

215218
StepVerifier.withVirtualTime(() -> {
216-
ServiceInstanceListSupplier delegate = Mockito
217-
.mock(ServiceInstanceListSupplier.class);
219+
ServiceInstanceListSupplier delegate = mock(
220+
ServiceInstanceListSupplier.class);
218221
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
219222
Mockito.when(delegate.get()).thenReturn(
220223
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));
221224

222-
HealthCheckServiceInstanceListSupplier mock = Mockito
223-
.mock(HealthCheckServiceInstanceListSupplier.class);
225+
HealthCheckServiceInstanceListSupplier mock = mock(
226+
HealthCheckServiceInstanceListSupplier.class);
224227
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance1);
225228
Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(mock)
226229
.isAlive(serviceInstance2);
@@ -250,8 +253,8 @@ void shouldEmitAllInstancesIfAllIsAliveChecksFailed() {
250253
SERVICE_ID, "127.0.0.2", port, false);
251254

252255
StepVerifier.withVirtualTime(() -> {
253-
ServiceInstanceListSupplier delegate = Mockito
254-
.mock(ServiceInstanceListSupplier.class);
256+
ServiceInstanceListSupplier delegate = mock(
257+
ServiceInstanceListSupplier.class);
255258
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
256259
Mockito.when(delegate.get()).thenReturn(
257260
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));
@@ -282,8 +285,8 @@ void shouldMakeInitialDaleyAfterPropertiesSet() {
282285
SERVICE_ID, "127.0.0.1", port, false);
283286

284287
StepVerifier.withVirtualTime(() -> {
285-
ServiceInstanceListSupplier delegate = Mockito
286-
.mock(ServiceInstanceListSupplier.class);
288+
ServiceInstanceListSupplier delegate = mock(
289+
ServiceInstanceListSupplier.class);
287290
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
288291
Mockito.when(delegate.get())
289292
.thenReturn(Flux.just(Lists.list(serviceInstance1)));
@@ -314,14 +317,14 @@ void shouldRepeatIsAliveChecksIndefinitely() {
314317
SERVICE_ID, "127.0.0.2", port, false);
315318

316319
StepVerifier.withVirtualTime(() -> {
317-
ServiceInstanceListSupplier delegate = Mockito
318-
.mock(ServiceInstanceListSupplier.class);
320+
ServiceInstanceListSupplier delegate = mock(
321+
ServiceInstanceListSupplier.class);
319322
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
320323
Mockito.when(delegate.get()).thenReturn(
321324
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));
322325

323-
HealthCheckServiceInstanceListSupplier mock = Mockito
324-
.mock(HealthCheckServiceInstanceListSupplier.class);
326+
HealthCheckServiceInstanceListSupplier mock = mock(
327+
HealthCheckServiceInstanceListSupplier.class);
325328
Mockito.doReturn(Mono.just(false), Mono.just(true)).when(mock)
326329
.isAlive(serviceInstance1);
327330
Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(mock)
@@ -352,14 +355,14 @@ void shouldTimeoutIsAliveCheck() {
352355
SERVICE_ID, "127.0.0.1", port, false);
353356

354357
StepVerifier.withVirtualTime(() -> {
355-
ServiceInstanceListSupplier delegate = Mockito
356-
.mock(ServiceInstanceListSupplier.class);
358+
ServiceInstanceListSupplier delegate = mock(
359+
ServiceInstanceListSupplier.class);
357360
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
358361
Mockito.when(delegate.get())
359362
.thenReturn(Flux.just(Lists.list(serviceInstance1)));
360363

361-
HealthCheckServiceInstanceListSupplier mock = Mockito
362-
.mock(HealthCheckServiceInstanceListSupplier.class);
364+
HealthCheckServiceInstanceListSupplier mock = mock(
365+
HealthCheckServiceInstanceListSupplier.class);
363366
Mockito.when(mock.isAlive(serviceInstance1)).thenReturn(Mono.never(),
364367
Mono.just(true));
365368

@@ -391,8 +394,8 @@ void shouldUpdateInstances() {
391394
SERVICE_ID, "127.0.0.2", port, false);
392395

393396
StepVerifier.withVirtualTime(() -> {
394-
ServiceInstanceListSupplier delegate = Mockito
395-
.mock(ServiceInstanceListSupplier.class);
397+
ServiceInstanceListSupplier delegate = mock(
398+
ServiceInstanceListSupplier.class);
396399
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
397400
Flux<List<ServiceInstance>> instances = Flux
398401
.just(Lists.list(serviceInstance1))
@@ -421,6 +424,39 @@ protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
421424
.verify(VERIFY_TIMEOUT);
422425
}
423426

427+
@Test
428+
void shouldRefetchInstances() {
429+
healthCheck.setInitialDelay(1000);
430+
healthCheck.setRepeatHealthCheck(false);
431+
healthCheck.setRefetchInstancesInterval(Duration.ofSeconds(1));
432+
healthCheck.setRefetchInstances(true);
433+
ServiceInstance serviceInstance1 = new DefaultServiceInstance("ignored-service-1",
434+
SERVICE_ID, "127.0.0.1", port, false);
435+
ServiceInstance serviceInstance2 = new DefaultServiceInstance("ignored-service-2",
436+
SERVICE_ID, "127.0.0.2", port, false);
437+
438+
StepVerifier.withVirtualTime(() -> {
439+
ServiceInstanceListSupplier delegate = mock(
440+
ServiceInstanceListSupplier.class);
441+
when(delegate.get())
442+
.thenReturn(Flux.just(Collections.singletonList(serviceInstance1)))
443+
.thenReturn(Flux.just(Collections.singletonList(serviceInstance2)));
444+
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
445+
healthCheck, webClient) {
446+
@Override
447+
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
448+
return Mono.just(true);
449+
}
450+
};
451+
return listSupplier.get();
452+
}).expectSubscription()
453+
.expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
454+
.expectNext(Lists.list(serviceInstance1))
455+
.thenAwait(healthCheck.getRefetchInstancesInterval())
456+
.expectNext(Lists.list(serviceInstance2)).thenCancel()
457+
.verify(VERIFY_TIMEOUT);
458+
}
459+
424460
@Test
425461
void shouldCacheResultIfAfterPropertiesSetInvoked() {
426462
healthCheck.setInitialDelay(1000);
@@ -430,8 +466,8 @@ void shouldCacheResultIfAfterPropertiesSetInvoked() {
430466
AtomicInteger emitCounter = new AtomicInteger();
431467

432468
StepVerifier.withVirtualTime(() -> {
433-
ServiceInstanceListSupplier delegate = Mockito
434-
.mock(ServiceInstanceListSupplier.class);
469+
ServiceInstanceListSupplier delegate = mock(
470+
ServiceInstanceListSupplier.class);
435471
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
436472
Mockito.when(delegate.get())
437473
.thenReturn(Flux.just(Lists.list(serviceInstance1)));
@@ -468,8 +504,7 @@ void shouldCancelSubscription() {
468504

469505
final AtomicInteger instancesCanceled = new AtomicInteger();
470506
final AtomicBoolean subscribed = new AtomicBoolean();
471-
ServiceInstanceListSupplier delegate = Mockito
472-
.mock(ServiceInstanceListSupplier.class);
507+
ServiceInstanceListSupplier delegate = mock(ServiceInstanceListSupplier.class);
473508
Mockito.when(delegate.get())
474509
.thenReturn(Flux.<List<ServiceInstance>>never()
475510
.doOnSubscribe(subscription -> subscribed.set(true))

0 commit comments

Comments
 (0)