-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Improve thread safety of RedisMessageListenerContainer. #2299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
RedisMessageListenerContainer relies on 2 threads for subscription when patterns and channels topics are present. With Jedis, since the subscription thread blocks while listening for messages, an additional thread is used to subscribe to patterns while the subscription threads subscribe to channels and block. There were some race conditions between those two threads that could corrupt the Jedis stream since operations are not synchronized in JedisSubscription. A lock on the JedisSubscription instance has been added to enforce that operations on the Jedis stream cannot be affected by a concurrent thread. Additionaly, there were no error handling and retry mechanism on the pattern subscription thread. Multiple conditions could trigger an unexpected behavior here, exceptions were not handled and logged to stderr with no notice. Also, if the connection was not subscribed after 3 tries, the thread would exit silently with no log. Defensive measure have been added to retry redis connection failures and the subscription will now retry indefinitely, unless canceled on shutdown and on the main subscription thread errors. Fixes spring-projects#964 for versions before spring-projects#2256 was introduced.
} | ||
if (!getPatterns().isEmpty()) { | ||
jedisPubSub.punsubscribe(); | ||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this class, synchronization would technically not be necessary alongside my other changes. But I figure it's not a bad idea to add it in case there's a race condition somewhere else in Spring code that I've missed.
This class is not thread safe since commands like jedisPubSub::unsubscribe
write to the non synchronized connection OutputStream
then flush it after. Therefore, if it's accessed by many threads, you can pollute the OutputStream
.
@@ -69,6 +70,7 @@ | |||
* @author Way Joke | |||
* @author Thomas Darimont | |||
* @author Mark Paluch | |||
* @author Jacques-Etienne Beaudet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if my changes aren't worthy enough to be here, I don't really mind I was mainly following the contributions guideline!
@@ -689,36 +690,55 @@ private class SubscriptionTask implements SchedulingAwareRunnable { | |||
private class PatternSubscriptionTask implements SchedulingAwareRunnable { | |||
|
|||
private long WAIT = 500; | |||
private long ROUNDS = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern subscription thread will now run until it's canceled or if it's done.
|
||
public boolean isLongLived() { | ||
return false; | ||
} | ||
|
||
void cancel() { | ||
isThreadRunning.set(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is called exclusively by the SubscriptionTask
when there is a RedisConnectionFailureException
or when the container is stopped.
Thread.currentThread().interrupt(); | ||
try { | ||
if (connection.isSubscribed()) { | ||
synchronized (localMonitor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is synchronized by localMonitor
to make sure the thread exit cleanly when canceled by the SubscriptionTask
. This should makes the shutdown more graceful and ordered if the container is shutdown while booting and subscription to patterns
} | ||
} | ||
} catch(Throwable e) { | ||
if (e instanceof RedisConnectionFailureException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connection errors will now be retried, like it is on the SubscriptionTask
. This could sneakily exit before and without proper exception handling, the stack would be written to the application stderr
and could go unnoticed.
sleepBeforeRecoveryAttempt(); | ||
} | ||
} else { | ||
logger.error("PatternSubscriptionTask aborted with exception:", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non redis connection errors will exit the thread like SubscriptionTask
try { | ||
Thread.sleep(WAIT); | ||
} catch (InterruptedException ex) { | ||
logger.info("PatternSubscriptionTask was interrupted, exiting."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added info log to make sure debugging is not too hard if you're chasing what's up, let me know if you want this in debug/trace but I figure it shouldn't hit much in a real life scenario.
@@ -759,6 +779,7 @@ public void run() { | |||
} | |||
} | |||
} catch (Throwable t) { | |||
cancelPatternSubscriptionTask(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this whole code block is rerun on error, I cancel the task here so that everything is clean for the next run. Since it's synchronized by localMonitor
, everything should be ordered.
I'm a bit reluctant to backport these changes, especially towards a version that is GA since over a year. Changes to The actual problem with Jedis and I strongly argue to leave the revised implementation on the |
Thanks for you insight. While I agree on the GA part, to me, it's still a bug and bugs should be fixed. Especially this one since you could have a pattern subscription fail completely silently or a corrupted jedis stream. Also, version 2.5.x is the one managed from Spring Boot 2.5.x which is still in support for another month and EoL in August 2023 if my math is correct. Even Spring Boot 2.6.x, which is the current stable version, does not pull on the 2.7.x branch. This means that everyone using Spring Boot with a released version is currently affected. I'd also argue that since Spring Session is using the problematic code path (having a topic + pattern subscription), it affects even more users and could potentially affect users that rely on I guess we could shuffle things a bit to reduce the footprint of the changes. Basically this PR fixes :
Just synchronizing Unless you have another idea? Thanks again |
WDYT @mp911de ? Thanks |
Version 2.5 is no longer maintained. |
Hi,
As instructed in #964, a PR to fix the issues I've encountered in RedisMessageListenerContainer.
I've commented the PR inline to ease the comprehension. Let me know if you need further details.
Testing those kind of multithreaded behavior is never easy but I believe all my additions have been tested in the unit tests I've added.