Skip to content

Commit a4cadc9

Browse files
artembilantzolov
authored andcommitted
Fix PostgresSubscribableChannel.notifyUpdate()
When transaction is configured for the `PostgresSubscribableChannel.notifyUpdate()` and it is rolled back, the next poll in that loop will return the same message. Again and again if transaction is always rolled back. This leads to the condition when we never leave this loop even if we fully unsubscribed from this channel. The issue has need spotted after introducing `SKIP LOCKED` for `PostgresChannelMessageStoreQueryProvider` which leads to the locked record in DB in the mentioned above transaction. * Introduce `PostgresSubscribableChannel.hasHandlers` flag to check in the `notifyUpdate()` before performing poll query in DB. **Cherry-pick to `6.1.x` & `6.0.x`**
1 parent ccaa853 commit a4cadc9

File tree

2 files changed

+28
-14
lines changed

2 files changed

+28
-14
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
6767

6868
private Executor executor = new SimpleAsyncTaskExecutor();
6969

70+
private volatile boolean hasHandlers;
71+
7072
/**
7173
* Create a subscribable channel for a Postgres database.
7274
* @param jdbcChannelMessageStore The message store to use for the relevant region.
@@ -120,6 +122,7 @@ public boolean subscribe(MessageHandler handler) {
120122
boolean subscribed = super.subscribe(handler);
121123
if (this.dispatcher.getHandlerCount() == 1) {
122124
this.messageTableSubscriber.subscribe(this);
125+
this.hasHandlers = true;
123126
notifyUpdate();
124127
}
125128
return subscribed;
@@ -130,6 +133,7 @@ public boolean unsubscribe(MessageHandler handle) {
130133
boolean unsubscribed = super.unsubscribe(handle);
131134
if (this.dispatcher.getHandlerCount() == 0) {
132135
this.messageTableSubscriber.unsubscribe(this);
136+
this.hasHandlers = false;
133137
}
134138
return unsubscribed;
135139
}
@@ -151,18 +155,7 @@ public void notifyUpdate() {
151155
try {
152156
Optional<Message<?>> dispatchedMessage;
153157
do {
154-
if (this.transactionTemplate != null) {
155-
dispatchedMessage =
156-
this.retryTemplate.execute(context ->
157-
this.transactionTemplate.execute(status ->
158-
pollMessage()
159-
.map(this::dispatch)));
160-
}
161-
else {
162-
dispatchedMessage =
163-
pollMessage()
164-
.map(message -> this.retryTemplate.execute(context -> dispatch(message)));
165-
}
158+
dispatchedMessage = askForMessage();
166159
} while (dispatchedMessage.isPresent());
167160
}
168161
catch (Exception ex) {
@@ -171,6 +164,20 @@ public void notifyUpdate() {
171164
});
172165
}
173166

167+
private Optional<Message<?>> askForMessage() {
168+
if (this.hasHandlers) {
169+
if (this.transactionTemplate != null) {
170+
return this.retryTemplate.execute(context ->
171+
this.transactionTemplate.execute(status -> pollMessage().map(this::dispatch)));
172+
}
173+
else {
174+
return pollMessage()
175+
.map(message -> this.retryTemplate.execute(context -> dispatch(message)));
176+
}
177+
}
178+
return Optional.empty();
179+
}
180+
174181
private Optional<Message<?>> pollMessage() {
175182
return Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId));
176183
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
4646
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
4747
import org.springframework.jdbc.datasource.init.ScriptUtils;
48+
import org.springframework.messaging.MessageHandler;
4849
import org.springframework.messaging.support.GenericMessage;
4950
import org.springframework.retry.support.RetryTemplate;
5051
import org.springframework.test.annotation.DirtiesContext;
@@ -169,20 +170,26 @@ void testMessagesDispatchedInTransaction() throws InterruptedException {
169170
postgresSubscribableChannel.setTransactionManager(transactionManager);
170171

171172
postgresChannelMessageTableSubscriber.start();
172-
postgresSubscribableChannel.subscribe(message -> {
173+
MessageHandler messageHandler =
174+
message -> {
173175
try {
174176
throw new RuntimeException("An error has occurred");
175177
}
176178
finally {
177179
latch.countDown();
178180
}
179-
});
181+
};
182+
postgresSubscribableChannel.subscribe(messageHandler);
180183

181184
messageStore.addMessageToGroup(groupId, new GenericMessage<>("1"));
182185
messageStore.addMessageToGroup(groupId, new GenericMessage<>("2"));
183186

184187
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
185188

189+
// Stop subscriber to unlock records from TX for the next verification
190+
postgresChannelMessageTableSubscriber.stop();
191+
postgresSubscribableChannel.unsubscribe(messageHandler);
192+
186193
assertThat(messageStore.messageGroupSize(groupId)).isEqualTo(2);
187194
assertThat(messageStore.pollMessageFromGroup(groupId).getPayload()).isEqualTo("1");
188195
assertThat(messageStore.pollMessageFromGroup(groupId).getPayload()).isEqualTo("2");

0 commit comments

Comments
 (0)