Skip to content

Externalization of authExceptionRetryInterval for listeners and other improvements #44425

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
leonardowestphal opened this issue Feb 24, 2025 · 7 comments
Labels
status: duplicate A duplicate of another issue

Comments

@leonardowestphal
Copy link

Expected Behavior

I would like to suggest externalizing the authExceptionRetryInterval property used in the org.springframework.kafka.listener.ConsumerProperties class, allowing its configuration through application.properties.

Something like: spring.kafka.listener.auth-exception-retry-interval=5000

It would also be interesting if there were an option to configure a backoff policy or even allow the implementation of a custom handler for fatal exceptions like AuthenticationException.

Current Behavior

Currently, this configuration must be implemented manually in ConcurrentKafkaListenerContainerFactory and only supports a fixed retry interval.

Example:

@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory);		
		factory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofMillis(4000L));		

		return factory;
	}

Context

This way, we would avoid the need to implement code to activate this mechanism, which provides resilience for consumer applications.

@leonardowestphal leonardowestphal added status: waiting-for-triage An issue we've not yet triaged type: enhancement A general enhancement labels Feb 24, 2025
@artembilan
Copy link
Member

The spring.kafka properties has nothing to do with this project.
That is Spring Boot feature: https://docs.spring.io/spring-boot/reference/messaging/kafka.html
We will transfer this over there, respectively.

@wilkinsona wilkinsona transferred this issue from spring-projects/spring-kafka Feb 24, 2025
@wilkinsona
Copy link
Member

A property for authExceptionRetryInterval has been added recently in #44199.

It would also be interesting if there were an option to configure a backoff policy or even allow the implementation of a custom handler for fatal exceptions like AuthenticationException.

Can you please provide an example of what you have in mind here?

@artembilan
Copy link
Member

We already have publisher.publishEvent(new ConsumerRetryAuthEvent(this, this.thisOrParentContainer, reason)); which might give you an opportunity to deal with those error in retry manner.

@wilkinsona wilkinsona added status: waiting-for-feedback We need additional information before we can continue and removed type: enhancement A general enhancement labels Feb 24, 2025
@leonardowestphal
Copy link
Author

A property for authExceptionRetryInterval has been added recently in #44199.

Nice, thanks!

It would also be interesting if there were an option to configure a backoff policy or even allow the implementation of a custom handler for fatal exceptions like AuthenticationException.

Can you please provide an example of what you have in mind here?

I’m not sure if my understanding is 100% correct, so I ask you to check whether what I’m suggesting makes sense or not.

From my analysis, the handling of fatal authentication errors occurs in the run() method of KafkaMessageListenerContainer<K, V>.ListenerConsumer, as shown in the code below. However, the handling of errors during the processing of a record happens at a different moment, and all handling mechanisms for this type of error only work for non-fatal errors.

catch (AuthorizationException | AuthenticationException ae) {
	if (this.authExceptionRetryInterval == null) {
		this.logger.error(ae, "Authentication/Authorization Exception and no authExceptionRetryInterval set");
		this.fatalError = true;
		exitThrowable = ae;
		break;
	}

	this.logger.error(ae, "Authentication/Authorization Exception, retrying in " + this.authExceptionRetryInterval.toMillis() + " ms");
	KafkaMessageListenerContainer.this.publishRetryAuthEvent(ae);
	failedAuthRetry = true;
	this.sleepFor(this.authExceptionRetryInterval);
}

It would be possible to implement what I have in mind in different ways, just as it is for other types of errors, but I will try to provide a general idea.

// Perhaps there could be something similar to the @ReRetryableTopic annotation, but specifically for fatal errors, allowing the definition of a dedicated retry policy.
// Maybe an attribute similar to include could be added to handle specific fatal errors.
// In my view, handling fatal errors such as authentication failures should be different from handling errors during record processing. That's why the idea was to create something specific and separate.
@KafkaListener(topics = "my-topic-a",	concurrency = "2")
@RetryableTopic(backoff = @Backoff(delay = 5000, multiplier = 2, maxDelay = 50000), attempts = "3", include = AuthenticationException.class)
public void test(ConsumerRecord<Integer, String> record) throws InterruptedException{
	//processing logic...
}

// Another approach would be to allow the creation of a specific ErrorHandler for fatal exceptions (e.g. DefaultFatalErrorHandler), similar to DefaultErrorHandler. In it, we could also define our retry policy.
@Bean
public DefaultErrorHandler errorHandler() {
	BackOff fixedBackOff = new FixedBackOff(4500, 5);
	DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
		//processing logic...
	}, fixedBackOff);
	errorHandler.addRetryableExceptions(AuthenticationException.class);

	return errorHandler;
}

I emphasize that if my understanding is incorrect, please let me know.

Thank you in advance.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Feb 24, 2025
@leonardowestphal
Copy link
Author

We already have publisher.publishEvent(new ConsumerRetryAuthEvent(this, this.thisOrParentContainer, reason)); which might give you an opportunity to deal with those error in retry manner.

I don’t see how I could use this together with a listener created with the @KafkaListener annotation. I would have to reimplement my consumers in a different way! How did you envision its usage?

@artembilan
Copy link
Member

The AuthenticationException is a special connection error which has nothing to do with user consumer where indeed that retry and back-off logic is applied.
Therefore we cannot apply similar configuration purpose on this kind of errors.

That ConsumerRetryAuthEvent has everything what you need in regards the mentioned @KafkaListener.
The later one does produce a ListenerContainer object into the KafkaListenerEndpointRegistry.
Therefore you can determine by that event what exactly @KafkaListener is failing to connect due to auth error.
So, you can simply stop it from your @EventListener.

Either way this looks like totally different story not related to the original request for the property to expose.

Please, confirm and we can close this as a duplicated of the mentioned one with a desired fix.

@leonardowestphal
Copy link
Author

You can close this. My mistake was assuming that the property was linked to spring-kafka, which caused the topics to get mixed up... my bad.

My idea was to implement a more customized authentication retry to prevent the container from stopping and needing a restart. But for now, the retry using AuthExceptionRetryInterval meets my needs. I will take a closer look at this ConsumerRetryAuthEvent issue. Thanks!

@philwebb philwebb added status: duplicate A duplicate of another issue and removed status: waiting-for-triage An issue we've not yet triaged status: feedback-provided Feedback has been provided labels Feb 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: duplicate A duplicate of another issue
Projects
None yet
Development

No branches or pull requests

5 participants