Skip to content

Support same service instance preference #862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/src/main/asciidoc/spring-cloud-commons.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,28 @@ public class CustomLoadBalancerConfiguration {

WARNING: `HealthCheckServiceInstanceListSupplier` has its own caching mechanism based on Reactor Flux `replay()`. Therefore, if it's being used, you may want to skip wrapping that supplier with `CachingServiceInstanceListSupplier`.

=== Same instance preference for LoadBalancer

You can set up the LoadBalancer in such a way that it prefers the instance that was previously selected, if that instance is available.

For that, you need to use `SameInstancePreferenceServiceInstanceListSupplier`. You can configure it either by setting the value of `spring.cloud.loadbalancer.configurations` to `same-instance-preference` or by providing your own `ServiceInstanceListSupplier` bean -- for example:

[source,java,indent=0]
----
public class CustomLoadBalancerConfiguration {

@Bean
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder()
.withDiscoveryClient()
.withSameInstancePreference()
.build(context);
}
}
----

TIP: This is also a replacement for Zookeeper `StickyRule`.

[[spring-cloud-loadbalancer-starter]]
=== Spring Cloud LoadBalancer Starter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ public ServiceInstanceListSupplier healthCheckDiscoveryClientServiceInstanceList
.withHealthChecks().build(context);
}

@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations",
havingValue = "same-instance-preference")
public ServiceInstanceListSupplier sameInstancePreferenceServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder().withDiscoveryClient()
.withSameInstancePreference().build(context);
}

@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
Expand Down Expand Up @@ -157,6 +168,17 @@ public ServiceInstanceListSupplier healthCheckDiscoveryClientServiceInstanceList
.withHealthChecks().build(context);
}

@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations",
havingValue = "same-instance-preference")
public ServiceInstanceListSupplier sameInstancePreferenceServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient()
.withSameInstancePreference().build(context);
}

@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,28 @@ public Mono<Response<ServiceInstance>> choose(Request request) {
if (serviceInstanceListSupplierProvider != null) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get().next().map(this::getInstanceResponse);
return supplier.get().next()
.map(serviceInstances -> processInstanceResponse(supplier,
serviceInstances));
}
ServiceInstanceSupplier supplier = this.serviceInstanceSupplier
.getIfAvailable(NoopServiceInstanceSupplier::new);
return supplier.get().collectList().map(this::getInstanceResponse);
}

private Response<ServiceInstance> processInstanceResponse(
ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(
serviceInstances);
if (supplier instanceof SelectedInstanceCallback
&& serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier)
.selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}

private Response<ServiceInstance> getInstanceResponse(
List<ServiceInstance> instances) {
if (instances.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.loadbalancer.core;

import java.util.Collections;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;

import org.springframework.cloud.client.ServiceInstance;

/**
* An implementation of {@link ServiceInstanceListSupplier} that selects the previously
* chosen instance if it's available.
*
* @author Olga Maciaszek-Sharma
* @since 3.0.0
*/
public class SameInstancePreferenceServiceInstanceListSupplier extends
DelegatingServiceInstanceListSupplier implements SelectedInstanceCallback {

private static final Log LOG = LogFactory
.getLog(SameInstancePreferenceServiceInstanceListSupplier.class);

private ServiceInstance previouslyReturnedInstance;

public SameInstancePreferenceServiceInstanceListSupplier(
ServiceInstanceListSupplier delegate) {
super(delegate);
}

@Override
public String getServiceId() {
return delegate.getServiceId();
}

@Override
public Flux<List<ServiceInstance>> get() {
return delegate.get().map(this::filteredBySameInstancePreference);
}

private List<ServiceInstance> filteredBySameInstancePreference(
List<ServiceInstance> serviceInstances) {
if (previouslyReturnedInstance != null
&& serviceInstances.contains(previouslyReturnedInstance)) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"Returning previously selected service instance: %s",
previouslyReturnedInstance));
}
return Collections.singletonList(previouslyReturnedInstance);
}
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"Previously selected service instance %s was not available. Returning all the instances returned by delegate.",
previouslyReturnedInstance));
}
previouslyReturnedInstance = null;
return serviceInstances;
}

@Override
public void selectedServiceInstance(ServiceInstance serviceInstance) {
if (previouslyReturnedInstance == null
|| !previouslyReturnedInstance.equals(serviceInstance)) {
previouslyReturnedInstance = serviceInstance;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.loadbalancer.core;

import org.springframework.cloud.client.ServiceInstance;

/**
* A callback interface that allows to pass the selected service instance data from the
* LoadBalancer.
*
* @author Olga Maciaszek-Sharma
*/
public interface SelectedInstanceCallback {

/**
* Passes the selected {@link ServiceInstance} as an argument.
* @param serviceInstance that has been selected
*/
void selectedServiceInstance(ServiceInstance serviceInstance);

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,19 @@ public ServiceInstanceListSupplierBuilder withHealthChecks() {
return this;
}

/**
* Adds a {@link SameInstancePreferenceServiceInstanceListSupplier} to the
* {@link ServiceInstanceListSupplier} hierarchy.
* @return the {@link ServiceInstanceListSupplierBuilder} object
*/
public ServiceInstanceListSupplierBuilder withSameInstancePreference() {
DelegateCreator creator = (context,
delegate) -> new SameInstancePreferenceServiceInstanceListSupplier(
delegate);
this.creators.add(creator);
return this;
}

/**
* Adds a {@link HealthCheckServiceInstanceListSupplier} that uses user-provided
* {@link WebClient} instance to the {@link ServiceInstanceListSupplier} hierarchy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.springframework.cloud.loadbalancer.core;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

Expand All @@ -45,13 +47,17 @@
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.cloud.loadbalancer.support.ServiceInstanceListSuppliers;
import org.springframework.cloud.loadbalancer.support.ServiceInstanceSuppliers;
import org.springframework.cloud.loadbalancer.support.SimpleObjectProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.core.ResolvableType;
import org.springframework.core.env.Environment;
import org.springframework.test.context.junit4.SpringRunner;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.BDDAssertions.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* @author Spencer Gibb
Expand Down Expand Up @@ -165,6 +171,22 @@ public void canPassHintViaRequest() {
assertThat(serviceInstance.getServiceId()).isEqualTo("test2");
}

@Test
public void selectedInstanceCallback() {
String serviceId = "test1";
ServiceInstance serviceInstance = instance(serviceId, "1host", false);
SameInstancePreferenceServiceInstanceListSupplier supplier = mock(
SameInstancePreferenceServiceInstanceListSupplier.class);
when(supplier.get())
.thenReturn(Flux.just(Collections.singletonList(serviceInstance)));
RoundRobinLoadBalancer loadBalancer = new RoundRobinLoadBalancer(
new SimpleObjectProvider<>(supplier), serviceId);

loadBalancer.choose().block();

verify(supplier).selectedServiceInstance(serviceInstance);
}

private static class TestHintLoadBalancer extends RoundRobinLoadBalancer {

TestHintLoadBalancer(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.loadbalancer.core;

import java.util.Arrays;
import java.util.List;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Tests for {@link SameInstancePreferenceServiceInstanceListSupplier}.
*
* @author Olga Maciaszek-Sharma
*/
class SameInstancePreferenceServiceInstanceListSupplierTests {

private final DiscoveryClientServiceInstanceListSupplier delegate = mock(
DiscoveryClientServiceInstanceListSupplier.class);

private final SameInstancePreferenceServiceInstanceListSupplier supplier = new SameInstancePreferenceServiceInstanceListSupplier(
delegate);

private final ServiceInstance first = serviceInstance("test-1");

private final ServiceInstance second = serviceInstance("test-2");

private final ServiceInstance third = serviceInstance("test-3");

@Test
void shouldReturnPreviouslySelectedInstanceIfAvailable() {
when(delegate.get()).thenReturn(Flux.just(Arrays.asList(first, second, third)));
supplier.selectedServiceInstance(first);

List<ServiceInstance> instances = supplier.get().blockFirst();

assertThat(instances).hasSize(1);
assertThat(instances.get(0)).isEqualTo(first);
}

@Test
void shouldReturnAllInstancesFromDelegateIfNoPreviouslySelectedInstance() {
when(delegate.get()).thenReturn(Flux.just(Arrays.asList(first, second, third)));

List<ServiceInstance> instances = supplier.get().blockFirst();

assertThat(instances).hasSize(3);
}

@Test
void shouldReturnAllInstancesFromDelegateIfPreviouslySelectedInstanceIfAvailable() {
when(delegate.get()).thenReturn(Flux.just(Arrays.asList(second, third)));
supplier.selectedServiceInstance(first);

List<ServiceInstance> instances = supplier.get().blockFirst();

assertThat(instances).hasSize(2);
}

private DefaultServiceInstance serviceInstance(String instanceId) {
return new DefaultServiceInstance(instanceId, "test", "http://test.test", 9080,
false);
}

}