Skip to content

GH-2220: Fix TopicPartitionOffset for Retry Topics #2223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ public KafkaTemplate<String, Object> kafkaTemplate() {
----
====

IMPORTANT: Multiple `@KafkaListener` annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic.
It's best to use a single `RetryTopicConfiguration` bean for configuration of such topics; if multiple `@RetryableTopic` annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic's listeners and the other annotations' values will be ignored.

==== Features

Most of the features are available both for the `@RetryableTopic` annotation and the `RetryTopicConfiguration` beans.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -219,6 +219,16 @@ public long delay() {
return this.delayMs;
}

/**
* Return the number of partitions the
* retry topics should be created with.
* @return the number of partitions.
* @since 2.7.13
*/
public int numPartitions() {
return this.numPartitions;
}

@Nullable
public Boolean autoStartDltHandler() {
return this.autoStartDltHandler;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
Expand All @@ -42,6 +43,8 @@
*/
public class EndpointCustomizerFactory {

private static final int DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT = 0;

private final DestinationTopic.Properties destinationProperties;

private final EndpointHandlerMethod beanMethod;
Expand All @@ -50,7 +53,7 @@ public class EndpointCustomizerFactory {

private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;

EndpointCustomizerFactory(DestinationTopic.Properties destinationProperties, EndpointHandlerMethod beanMethod,
public EndpointCustomizerFactory(DestinationTopic.Properties destinationProperties, EndpointHandlerMethod beanMethod,
BeanFactory beanFactory, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {

this.destinationProperties = destinationProperties;
Expand All @@ -71,7 +74,14 @@ protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties pr
Collection<EndpointCustomizer.TopicNamesHolder> topics = customizeAndRegisterTopics(namesProvider, endpoint);
endpoint.setId(namesProvider.getEndpointId(endpoint));
endpoint.setGroupId(namesProvider.getGroupId(endpoint));
endpoint.setTopics(topics.stream().map(EndpointCustomizer.TopicNamesHolder::getCustomizedTopic).toArray(String[]::new));
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider,
endpoint.getTopicPartitionsToAssign()));
}
else {
endpoint.setTopics(endpoint.getTopics().stream()
.map(namesProvider::getTopicName).toArray(String[]::new));
}
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint));
endpoint.setGroup(namesProvider.getGroup(endpoint));
endpoint.setBean(bean);
Expand All @@ -84,6 +94,29 @@ protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties pr
};
}

private static TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties,
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
TopicPartitionOffset[] topicPartitionOffsets) {
return Stream.of(topicPartitionOffsets)
.map(tpo -> properties.isMainEndpoint()
? getTPOForMainTopic(namesProvider, tpo)
: getTPOForRetryTopics(properties, namesProvider, tpo))
.toArray(TopicPartitionOffset[]::new);
}

private static TopicPartitionOffset getTPOForRetryTopics(DestinationTopic.Properties properties, RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset tpo) {
return new TopicPartitionOffset(namesProvider.getTopicName(tpo.getTopic()),
tpo.getPartition() <= properties.numPartitions() ? tpo.getPartition() : DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT,
(Long) null);
}

private static TopicPartitionOffset getTPOForMainTopic(RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset tpo) {
TopicPartitionOffset newTpo = new TopicPartitionOffset(namesProvider.getTopicName(tpo.getTopic()),
tpo.getPartition(), tpo.getOffset(), tpo.getPosition());
newTpo.setRelativeToCurrent(tpo.isRelativeToCurrent());
return newTpo;
}

protected Collection<EndpointCustomizer.TopicNamesHolder> customizeAndRegisterTopics(
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
MethodKafkaListenerEndpoint<?, ?> endpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,14 @@ private Consumer<Collection<String>> getTopicCreationFunction(RetryTopicConfigur
}

protected void createNewTopicBeans(Collection<String> topics, RetryTopicConfiguration.TopicCreation config) {
topics.forEach(topic ->
((DefaultListableBeanFactory) this.beanFactory)
.registerSingleton(topic + "-topicRegistrationBean",
new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor()))
topics.forEach(topic -> {
DefaultListableBeanFactory bf = ((DefaultListableBeanFactory) this.beanFactory);
String beanName = topic + "-topicRegistrationBean";
if (!bf.containsBean(beanName)) {
bf.registerSingleton(beanName,
new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor()));
}
}
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.retrytopic;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.support.TopicPartitionOffset;

/**
* @author Tomaz Fernandes
* @since 2.8.5
*/
@ExtendWith(MockitoExtension.class)
class EndpointCustomizerFactoryTests {

@Mock
private DestinationTopic.Properties properties;

@Mock
private EndpointHandlerMethod beanMethod;

@Mock
private BeanFactory beanFactory;

@Mock
private RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;

@Mock
private MethodKafkaListenerEndpoint<?, ?> endpoint;

private final String[] topics = {"myTopic1", "myTopic2"};

private final Method method = EndpointCustomizerFactory.class.getDeclaredMethods()[0];

@Test
void shouldNotCustomizeEndpointForMainTopicWithTopics() {

given(beanMethod.resolveBean(this.beanFactory)).willReturn(method);
given(endpoint.getTopics()).willReturn(Arrays.asList(topics));
given(properties.suffix()).willReturn("");
RetryTopicNamesProviderFactory.RetryTopicNamesProvider provider =
new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties);
given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider);

EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod,
beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();

List<EndpointCustomizer.TopicNamesHolder> holders =
(List<EndpointCustomizer.TopicNamesHolder>) endpointCustomizer.customizeEndpointAndCollectTopics(endpoint);

assertThat(holders).hasSize(2).element(0)
.matches(assertMainTopic(0));
assertThat(holders).element(1)
.matches(assertMainTopic(1));

}

@Test
void shouldNotCustomizeEndpointForMainTopicWithTPO() {

given(beanMethod.resolveBean(this.beanFactory)).willReturn(method);
given(properties.isMainEndpoint()).willReturn(true);
given(properties.suffix()).willReturn("");
RetryTopicNamesProviderFactory.RetryTopicNamesProvider provider =
new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties);
given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider);

String testString = "testString";
MethodKafkaListenerEndpoint<Object, Object> endpointTPO = new MethodKafkaListenerEndpoint<>();
endpointTPO.setTopicPartitions(new TopicPartitionOffset(topics[0], 0, 0L),
new TopicPartitionOffset(topics[1], 1, 1L));
endpointTPO.setMethod(this.method);
endpointTPO.setId(testString);
endpointTPO.setClientIdPrefix(testString);
endpointTPO.setGroup(testString);

EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod,
beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();

List<EndpointCustomizer.TopicNamesHolder> holders =
(List<EndpointCustomizer.TopicNamesHolder>) endpointCustomizer.customizeEndpointAndCollectTopics(endpointTPO);

assertThat(holders).hasSize(2).element(0)
.matches(assertMainTopic(0));
assertThat(holders).element(1)
.matches(assertMainTopic(1));

assertThat(endpointTPO.getTopics())
.isEmpty();

TopicPartitionOffset[] topicPartitionsToAssign = endpointTPO.getTopicPartitionsToAssign();
assertThat(topicPartitionsToAssign).hasSize(2);
assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[0],
new TopicPartitionOffset(topics[0], 0, 0L))).isTrue();
assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[1],
new TopicPartitionOffset(topics[1], 1, 1L))).isTrue();

}

private Predicate<EndpointCustomizer.TopicNamesHolder> assertMainTopic(int index) {
return holder -> holder.getCustomizedTopic().equals(topics[index])
&& holder.getMainTopic().equals(topics[index]);
}

@Test
void shouldCustomizeEndpointForRetryTopic() {

MethodKafkaListenerEndpoint<Object, Object> endpoint = new MethodKafkaListenerEndpoint<>();
String testString = "testString";
endpoint.setTopics(this.topics);
endpoint.setMethod(this.method);
endpoint.setId(testString);
endpoint.setClientIdPrefix(testString);
endpoint.setGroup(testString);

MethodKafkaListenerEndpoint<Object, Object> endpointTPO = new MethodKafkaListenerEndpoint<>();
endpointTPO.setTopicPartitions(new TopicPartitionOffset(topics[0], 0, 0L),
new TopicPartitionOffset(topics[1], 1, 1L));
endpointTPO.setMethod(this.method);
endpointTPO.setId(testString);
endpointTPO.setClientIdPrefix(testString);
endpointTPO.setGroup(testString);

String suffix = "-retry";
given(beanMethod.resolveBean(this.beanFactory)).willReturn(method);
given(properties.isMainEndpoint()).willReturn(false);
given(properties.suffix()).willReturn(suffix);
given(properties.numPartitions()).willReturn(2);

RetryTopicNamesProviderFactory.RetryTopicNamesProvider provider =
new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties);
given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider);

EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod,
beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();

List<EndpointCustomizer.TopicNamesHolder> holders =
(List<EndpointCustomizer.TopicNamesHolder>) endpointCustomizer.customizeEndpointAndCollectTopics(endpoint);

String topic1WithSuffix = topics[0] + suffix;
String topic2WithSuffix = topics[1] + suffix;
assertThat(holders).hasSize(2).element(0)
.matches(holder -> holder.getMainTopic().equals(topics[0])
&& holder.getCustomizedTopic().equals(topic1WithSuffix));
assertThat(holders).hasSize(2).element(1)
.matches(holder -> holder.getMainTopic().equals(topics[1])
&& holder.getCustomizedTopic().equals(topic2WithSuffix));

String testStringSuffix = testString + suffix;

assertThat(endpoint.getTopics())
.contains(topic1WithSuffix, topic2WithSuffix);
assertThat(endpoint.getId())
.isEqualTo(testStringSuffix);
assertThat(endpoint.getClientIdPrefix())
.isEqualTo(testStringSuffix);
assertThat(endpoint.getGroup())
.isEqualTo(testStringSuffix);
assertThat(endpoint.getTopicPartitionsToAssign()).isEmpty();

List<EndpointCustomizer.TopicNamesHolder> holdersTPO =
(List<EndpointCustomizer.TopicNamesHolder>) endpointCustomizer.customizeEndpointAndCollectTopics(endpointTPO);

assertThat(holdersTPO).hasSize(2).element(0)
.matches(holder -> holder.getMainTopic().equals(topics[0])
&& holder.getCustomizedTopic().equals(topic1WithSuffix));
assertThat(holdersTPO).hasSize(2).element(1)
.matches(holder -> holder.getMainTopic().equals(topics[1])
&& holder.getCustomizedTopic().equals(topic2WithSuffix));

assertThat(endpointTPO.getTopics())
.isEmpty();

TopicPartitionOffset[] topicPartitionsToAssign = endpointTPO.getTopicPartitionsToAssign();
assertThat(topicPartitionsToAssign).hasSize(2);
assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[0],
new TopicPartitionOffset(topic1WithSuffix, 0, (Long) null))).isTrue();
assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[1],
new TopicPartitionOffset(topic2WithSuffix, 1, (Long) null))).isTrue();

assertThat(endpointTPO.getId())
.isEqualTo(testStringSuffix);
assertThat(endpointTPO.getClientIdPrefix())
.isEqualTo(testStringSuffix);
assertThat(endpointTPO.getGroup())
.isEqualTo(testStringSuffix);
}

private boolean equalsTopicPartitionOffset(TopicPartitionOffset tpo1, TopicPartitionOffset tpo2) {
return tpo1.getTopicPartition().equals(tpo2.getTopicPartition()) &&
((tpo1.getOffset() == null && tpo2.getOffset() == null) ||
(tpo1.getOffset() != null && tpo1.getOffset().equals(tpo2.getOffset())));

}
}
Loading