diff --git a/docs/src/main/asciidoc/spring-cloud-commons.adoc b/docs/src/main/asciidoc/spring-cloud-commons.adoc index 02da90307..6066295c7 100644 --- a/docs/src/main/asciidoc/spring-cloud-commons.adoc +++ b/docs/src/main/asciidoc/spring-cloud-commons.adoc @@ -995,6 +995,28 @@ public class CustomLoadBalancerConfiguration { WARNING: `HealthCheckServiceInstanceListSupplier` has its own caching mechanism based on Reactor Flux `replay()`. Therefore, if it's being used, you may want to skip wrapping that supplier with `CachingServiceInstanceListSupplier`. +=== Same instance preference for LoadBalancer + +You can set up the LoadBalancer in such a way that it prefers the instance that was previously selected, if that instance is available. + +For that, you need to use `SameInstancePreferenceServiceInstanceListSupplier`. You can configure it either by setting the value of `spring.cloud.loadbalancer.configurations` to `same-instance-preference` or by providing your own `ServiceInstanceListSupplier` bean -- for example: + +[source,java,indent=0] +---- +public class CustomLoadBalancerConfiguration { + + @Bean + public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier( + ConfigurableApplicationContext context) { + return ServiceInstanceListSupplier.builder() + .withDiscoveryClient() + .withSameInstancePreference() + .build(context); + } + } +---- + +TIP: This is also a replacement for Zookeeper `StickyRule`. [[spring-cloud-loadbalancer-starter]] === Spring Cloud LoadBalancer Starter diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java index 39d1016af..571549003 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java @@ -100,6 +100,17 @@ public ServiceInstanceListSupplier healthCheckDiscoveryClientServiceInstanceList .withHealthChecks().build(context); } + @Bean + @ConditionalOnBean(ReactiveDiscoveryClient.class) + @ConditionalOnMissingBean + @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", + havingValue = "same-instance-preference") + public ServiceInstanceListSupplier sameInstancePreferenceServiceInstanceListSupplier( + ConfigurableApplicationContext context) { + return ServiceInstanceListSupplier.builder().withDiscoveryClient() + .withSameInstancePreference().build(context); + } + @Bean @ConditionalOnBean(ReactiveDiscoveryClient.class) @ConditionalOnMissingBean @@ -157,6 +168,17 @@ public ServiceInstanceListSupplier healthCheckDiscoveryClientServiceInstanceList .withHealthChecks().build(context); } + @Bean + @ConditionalOnBean(DiscoveryClient.class) + @ConditionalOnMissingBean + @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", + havingValue = "same-instance-preference") + public ServiceInstanceListSupplier sameInstancePreferenceServiceInstanceListSupplier( + ConfigurableApplicationContext context) { + return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient() + .withSameInstancePreference().build(context); + } + @Bean @ConditionalOnBean(DiscoveryClient.class) @ConditionalOnMissingBean diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinLoadBalancer.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinLoadBalancer.java index be7a59df0..a8b097635 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinLoadBalancer.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinLoadBalancer.java @@ -115,13 +115,28 @@ public Mono> choose(Request request) { if (serviceInstanceListSupplierProvider != null) { ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider .getIfAvailable(NoopServiceInstanceListSupplier::new); - return supplier.get().next().map(this::getInstanceResponse); + return supplier.get().next() + .map(serviceInstances -> processInstanceResponse(supplier, + serviceInstances)); } ServiceInstanceSupplier supplier = this.serviceInstanceSupplier .getIfAvailable(NoopServiceInstanceSupplier::new); return supplier.get().collectList().map(this::getInstanceResponse); } + private Response processInstanceResponse( + ServiceInstanceListSupplier supplier, + List serviceInstances) { + Response serviceInstanceResponse = getInstanceResponse( + serviceInstances); + if (supplier instanceof SelectedInstanceCallback + && serviceInstanceResponse.hasServer()) { + ((SelectedInstanceCallback) supplier) + .selectedServiceInstance(serviceInstanceResponse.getServer()); + } + return serviceInstanceResponse; + } + private Response getInstanceResponse( List instances) { if (instances.isEmpty()) { diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/SameInstancePreferenceServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/SameInstancePreferenceServiceInstanceListSupplier.java new file mode 100644 index 000000000..ff875a641 --- /dev/null +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/SameInstancePreferenceServiceInstanceListSupplier.java @@ -0,0 +1,86 @@ +/* + * Copyright 2012-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.loadbalancer.core; + +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Flux; + +import org.springframework.cloud.client.ServiceInstance; + +/** + * An implementation of {@link ServiceInstanceListSupplier} that selects the previously + * chosen instance if it's available. + * + * @author Olga Maciaszek-Sharma + * @since 3.0.0 + */ +public class SameInstancePreferenceServiceInstanceListSupplier extends + DelegatingServiceInstanceListSupplier implements SelectedInstanceCallback { + + private static final Log LOG = LogFactory + .getLog(SameInstancePreferenceServiceInstanceListSupplier.class); + + private ServiceInstance previouslyReturnedInstance; + + public SameInstancePreferenceServiceInstanceListSupplier( + ServiceInstanceListSupplier delegate) { + super(delegate); + } + + @Override + public String getServiceId() { + return delegate.getServiceId(); + } + + @Override + public Flux> get() { + return delegate.get().map(this::filteredBySameInstancePreference); + } + + private List filteredBySameInstancePreference( + List serviceInstances) { + if (previouslyReturnedInstance != null + && serviceInstances.contains(previouslyReturnedInstance)) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Returning previously selected service instance: %s", + previouslyReturnedInstance)); + } + return Collections.singletonList(previouslyReturnedInstance); + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Previously selected service instance %s was not available. Returning all the instances returned by delegate.", + previouslyReturnedInstance)); + } + previouslyReturnedInstance = null; + return serviceInstances; + } + + @Override + public void selectedServiceInstance(ServiceInstance serviceInstance) { + if (previouslyReturnedInstance == null + || !previouslyReturnedInstance.equals(serviceInstance)) { + previouslyReturnedInstance = serviceInstance; + } + } + +} diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/SelectedInstanceCallback.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/SelectedInstanceCallback.java new file mode 100644 index 000000000..6956b2eaa --- /dev/null +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/SelectedInstanceCallback.java @@ -0,0 +1,35 @@ +/* + * Copyright 2012-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.loadbalancer.core; + +import org.springframework.cloud.client.ServiceInstance; + +/** + * A callback interface that allows to pass the selected service instance data from the + * LoadBalancer. + * + * @author Olga Maciaszek-Sharma + */ +public interface SelectedInstanceCallback { + + /** + * Passes the selected {@link ServiceInstance} as an argument. + * @param serviceInstance that has been selected + */ + void selectedServiceInstance(ServiceInstance serviceInstance); + +} diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java index 8723d3803..51222ec8e 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java @@ -125,6 +125,19 @@ public ServiceInstanceListSupplierBuilder withHealthChecks() { return this; } + /** + * Adds a {@link SameInstancePreferenceServiceInstanceListSupplier} to the + * {@link ServiceInstanceListSupplier} hierarchy. + * @return the {@link ServiceInstanceListSupplierBuilder} object + */ + public ServiceInstanceListSupplierBuilder withSameInstancePreference() { + DelegateCreator creator = (context, + delegate) -> new SameInstancePreferenceServiceInstanceListSupplier( + delegate); + this.creators.add(creator); + return this; + } + /** * Adds a {@link HealthCheckServiceInstanceListSupplier} that uses user-provided * {@link WebClient} instance to the {@link ServiceInstanceListSupplier} hierarchy. diff --git a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/LoadBalancerTests.java b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/LoadBalancerTests.java index 5fc6b3dfc..327c1765c 100644 --- a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/LoadBalancerTests.java +++ b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/LoadBalancerTests.java @@ -17,10 +17,12 @@ package org.springframework.cloud.loadbalancer.core; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -45,6 +47,7 @@ import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory; import org.springframework.cloud.loadbalancer.support.ServiceInstanceListSuppliers; import org.springframework.cloud.loadbalancer.support.ServiceInstanceSuppliers; +import org.springframework.cloud.loadbalancer.support.SimpleObjectProvider; import org.springframework.context.annotation.Bean; import org.springframework.core.ResolvableType; import org.springframework.core.env.Environment; @@ -52,6 +55,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.BDDAssertions.then; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * @author Spencer Gibb @@ -165,6 +171,22 @@ public void canPassHintViaRequest() { assertThat(serviceInstance.getServiceId()).isEqualTo("test2"); } + @Test + public void selectedInstanceCallback() { + String serviceId = "test1"; + ServiceInstance serviceInstance = instance(serviceId, "1host", false); + SameInstancePreferenceServiceInstanceListSupplier supplier = mock( + SameInstancePreferenceServiceInstanceListSupplier.class); + when(supplier.get()) + .thenReturn(Flux.just(Collections.singletonList(serviceInstance))); + RoundRobinLoadBalancer loadBalancer = new RoundRobinLoadBalancer( + new SimpleObjectProvider<>(supplier), serviceId); + + loadBalancer.choose().block(); + + verify(supplier).selectedServiceInstance(serviceInstance); + } + private static class TestHintLoadBalancer extends RoundRobinLoadBalancer { TestHintLoadBalancer( diff --git a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/SameInstancePreferenceServiceInstanceListSupplierTests.java b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/SameInstancePreferenceServiceInstanceListSupplierTests.java new file mode 100644 index 000000000..ecc1a3f09 --- /dev/null +++ b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/SameInstancePreferenceServiceInstanceListSupplierTests.java @@ -0,0 +1,86 @@ +/* + * Copyright 2012-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.loadbalancer.core; + +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import org.springframework.cloud.client.DefaultServiceInstance; +import org.springframework.cloud.client.ServiceInstance; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link SameInstancePreferenceServiceInstanceListSupplier}. + * + * @author Olga Maciaszek-Sharma + */ +class SameInstancePreferenceServiceInstanceListSupplierTests { + + private final DiscoveryClientServiceInstanceListSupplier delegate = mock( + DiscoveryClientServiceInstanceListSupplier.class); + + private final SameInstancePreferenceServiceInstanceListSupplier supplier = new SameInstancePreferenceServiceInstanceListSupplier( + delegate); + + private final ServiceInstance first = serviceInstance("test-1"); + + private final ServiceInstance second = serviceInstance("test-2"); + + private final ServiceInstance third = serviceInstance("test-3"); + + @Test + void shouldReturnPreviouslySelectedInstanceIfAvailable() { + when(delegate.get()).thenReturn(Flux.just(Arrays.asList(first, second, third))); + supplier.selectedServiceInstance(first); + + List instances = supplier.get().blockFirst(); + + assertThat(instances).hasSize(1); + assertThat(instances.get(0)).isEqualTo(first); + } + + @Test + void shouldReturnAllInstancesFromDelegateIfNoPreviouslySelectedInstance() { + when(delegate.get()).thenReturn(Flux.just(Arrays.asList(first, second, third))); + + List instances = supplier.get().blockFirst(); + + assertThat(instances).hasSize(3); + } + + @Test + void shouldReturnAllInstancesFromDelegateIfPreviouslySelectedInstanceIfAvailable() { + when(delegate.get()).thenReturn(Flux.just(Arrays.asList(second, third))); + supplier.selectedServiceInstance(first); + + List instances = supplier.get().blockFirst(); + + assertThat(instances).hasSize(2); + } + + private DefaultServiceInstance serviceInstance(String instanceId) { + return new DefaultServiceInstance(instanceId, "test", "http://test.test", 9080, + false); + } + +}