-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
Using Redis stream in Spring Boot.
Whenever I restart the service, I'm receiving the above error message. This message is occurring intermittently.
org.springframework.data.redis.RedisSystemException: Redis exception
Caused by: io.lettuce.core.RedisException: Connection closed
`@Slf4j
@configuration
public class RedisStreamConfig {
private static final String START = "start";
private static final String STOP = "stop";
private static final String CONSUMER = "consumer";
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;
private final AtomicInteger recoveryAttempts = new AtomicInteger(0);
private final List subscriptions = new CopyOnWriteArrayList<>();
private static final ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
@value("${platform.redis.host}")
private String redisHost;
@value("${platform.redis.port}")
private int redisPort;
@value("${platform.redis.use.ssl}")
private boolean useSsl;
@value("${platform.registry.redis.stream.consumer.count}")
private int consumerCount;
@value("${platform.registry.redis.stream.consumer.polling.interval.ms}")
private int redisStreamConsumerPollingIntervalInMs;
@value("${platform.registry.redis.stream.consumer.messages.per.interval.ms}")
private int redisStreamConsumerMessagesPerPoll;
@value("${platform.registry.redis.stream.consumer.threads}")
private int redisStreamConsumerThreads;
@Autowired
private ApplicationContext applicationContext;
@bean
public RedisConnectionFactory redisConnectionFactory() {
val config = new RedisStandaloneConfiguration(this.redisHost, this.redisPort);
val clientOptions = io.lettuce.core.ClientOptions.builder()
.autoReconnect(true) // ensures lettuce will reconnect if connection is dropped
.disconnectedBehavior(io.lettuce.core.ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
.pingBeforeActivateConnection(true)
.timeoutOptions(TimeoutOptions.enabled())
.socketOptions(SocketOptions.builder()
.keepAlive(true)
.tcpNoDelay(true)
.build())
.build();
val clientConfigBuilder = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofMinutes(5)) // > this.redisStreamConsumerPollingIntervalInMs
.clientOptions(clientOptions);
if (useSsl) {
clientConfigBuilder.useSsl();
}
val clientConfig = clientConfigBuilder.build();
val factory = new LettuceConnectionFactory(config, clientConfig);
factory.setValidateConnection(true); // ensures connections are alive before use
return factory;
}
@bean
public RedisTemplate<String, Object> redisTemplate(final RedisConnectionFactory factory) {
val template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new StringRedisSerializer());
return template;
}
@bean(initMethod = START, destroyMethod = STOP)
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(final RedisConnectionFactory connectionFactory,
final RedisTemplate<String, Object> redisTemplate,
final InboundVoiceCallRedisStreamConsumer redisStreamConsumer) {
var options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(this.redisStreamConsumerPollingIntervalInMs))
.batchSize(this.redisStreamConsumerMessagesPerPoll)
.executor(Executors.newFixedThreadPool(this.redisStreamConsumerThreads))
.errorHandler(error -> handleRedisStreamError(error, redisTemplate, redisStreamConsumer))
.build();
this.container = StreamMessageListenerContainer.create(connectionFactory, options);
createConsumerGroupSafely(redisTemplate);
registerSubscriptions(redisStreamConsumer);
return this.container;
}
private void createConsumerGroupSafely(final RedisTemplate<String, Object> redisTemplate) {
try {
redisTemplate.opsForStream().createGroup(VOICE_CALL_INBOUND_STREAM, VOICE_CALL_INBOUND_GROUP);
log.info("Created consumer group: {}", VOICE_CALL_INBOUND_GROUP);
} catch (Exception e) {
log.warn("Error creating consumer group for redis stream", e);
}
}
private void registerSubscriptions(final InboundVoiceCallRedisStreamConsumer redisStreamConsumer) {
subscriptions.clear(); // ensure clean slate
for (var i = 1; i <= consumerCount; i++) {
val consumerName = CONSUMER + Symbols.UNDERSCORE + i;
log.info("Registering Redis consumer: {}", consumerName);
val subscription = container.receive(
Consumer.from(VOICE_CALL_INBOUND_GROUP, consumerName),
StreamOffset.create(VOICE_CALL_INBOUND_STREAM, ReadOffset.lastConsumed()),
redisStreamConsumer
);
subscriptions.add(subscription);
}
}
@PreDestroy
public void shutdownContainer() {
if (Objects.nonNull(container)) {
log.info("Stopping StreamMessageListenerContainer before shutdown");
while (container.isRunning()) {
try {
Thread.sleep(50);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
container.stop();
// Wait for subscriptions to be inactive to avoid connection closed errors
subscriptions.forEach(subscription -> {
while (subscription.isActive()) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
}
private void handleRedisStreamError(final Throwable error,
final RedisTemplate<String, Object> redisTemplate,
final InboundVoiceCallRedisStreamConsumer redisStreamConsumer) {
log.error("Redis Stream error occurred: {}", error.getMessage(), error);
// Short-circuit any recovery if context is closing/closed
if (applicationContext instanceof ConfigurableApplicationContext ctx && !ctx.isActive()) {
log.warn("Context is closing/closed, skipping recovery.");
return;
}
int attempt = recoveryAttempts.incrementAndGet();
long delay = Math.min(1000L * (1L << (attempt - 1)), 30000L); // exponential backoff
if (attempt > 5) {
log.warn("Recovery attempts are exceeded.");
return;
}
log.warn("Scheduling recovery attempt {} in {} ms", attempt, delay);
singleThreadScheduledExecutor.schedule(() -> {
try {
if (!(applicationContext instanceof ConfigurableApplicationContext execCtx) || !execCtx.isActive()) {
log.warn("ApplicationContext became inactive before recovery execution; aborting attempt {}.", attempt);
return;
}
log.info("Redis stream consumer recovery task starting (attempt {})...", attempt);
recoverContainer(redisTemplate, redisStreamConsumer);
log.info("Redis stream consumer recovery task completed (attempt {}).", attempt);
recoveryAttempts.set(0);
} catch (Throwable ex) {
log.error("Exception in redis stream consumer recovery task (attempt {}): {}", attempt, ex.getMessage(), ex);
}
}, delay, TimeUnit.MILLISECONDS);
}
private void recoverContainer(final RedisTemplate<String, Object> redisTemplate,
final InboundVoiceCallRedisStreamConsumer redisStreamConsumer) {
try {
log.info("Attempting Redis container recovery...");
if (Objects.nonNull(container) && container.isRunning()) {
container.stop();
log.info("Stopped container");
}
subscriptions.forEach(sub -> {
if (sub.isActive()) sub.cancel();
});
subscriptions.clear();
Thread.sleep(1000);
if (Objects.nonNull(container)) {
container.start();
log.info("Restarted Redis container");
createConsumerGroupSafely(redisTemplate);
registerSubscriptions(redisStreamConsumer); // re-register after restart
log.info("Re-registered Redis subscriptions after recovery");
}
recoveryAttempts.set(0);
} catch (Exception e) {
log.error("Recovery failed: {}", e.getMessage(), e);
}
}
}`
The properties:
platform.registry.redis.stream.messages.alive.time.ms=0
platform.registry.redis.stream.failed.message.retry.interval.time.ms=10000
platform.registry.redis.stream.failed.message.max.retry.count=0
platform.registry.redis.stream.consumer.polling.interval.ms=500
platform.registry.redis.stream.consumer.messages.per.interval.ms=4
platform.registry.redis.stream.consumer.threads=4
platform.registry.redis.stream.consumer.count=4