Skip to content

Commit d1146e5

Browse files
authored
GH-9368: Adding MqttMessageDrivenChannelAdapter at runtime
Fixes: #9368 * fixing MessageDrivenAdapters 1) exposing isConnection() in ClientManager 2) implementing isConnection() in Mqttv3ClientManager and Mqttv5ClientManager 3) Modifining OnInit() in AbstractMqttMessageDrivenChannelAdapter * Adding test to ClientManagerBackToBackTests Adding tests for addition of MessageDrivenAdapters at runtime. 1) Adding config classes Mqttv3ConfigWithStartedManager, Mqttv5ConfigWithStartedManager. 2) Adding test for config classes above. * Fixing code style * Adding required changes * `ClientManager` - @SInCE 6.4 comment to `isConnecttion()` * `Mqttv3ClientManager`, `Mqttv5ClientManager` - adding local variable and lock logic * `AbstractMqttMessageDrivenChannelAdapter` - changed `connectCompete()` to false * adding runtime ClientManger tests * `testV3ClientManagerRuntime` and `testV5ClientManagerRuntime` adding MessageDrivenAdaptes at runtime using `IntegrationFlowContext` * Adding Documentation for MqttPahoMessageDrivenChannelAdapter and Mqttv5PahoMessageDrivenChannelAdapter * fixing code style * adding required chnages * `ClientManager`, `Mqttv3ClientManager`, `Mqttv5ClientManager`, `AbstractMqttMessageDrivenChannelAdapter` - renamig `isConnection()` to `isConnected()` * fixing docs `mqtt.adoc` and `whats-new.adoc` * `ClientManagerBackToBackTests` adding factory interface `MessageDrivenChannelAdapterFactory` to create adapters * removing started tests
1 parent e90fd68 commit d1146e5

File tree

7 files changed

+190
-6
lines changed

7 files changed

+190
-6
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
*
2929
* @author Artem Vozhdayenko
3030
* @author Artem Bilan
31+
* @author Jiri Soucek
3132
*
3233
* @since 6.0
3334
*/
@@ -68,6 +69,13 @@ public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {
6869
*/
6970
boolean removeCallback(ConnectCallback connectCallback);
7071

72+
/**
73+
* Return the managed clients isConnected.
74+
* @return the managed clients isConnected.
75+
* @since 6.4
76+
*/
77+
boolean isConnected();
78+
7179
/**
7280
* A contract for a custom callback on {@code connectComplete} event from the client.
7381
*

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* @author Artem Vozhdayenko
3939
* @author Artem Bilan
4040
* @author Christian Tzolov
41+
* @author Jiri Soucek
4142
*
4243
* @since 6.0
4344
*/
@@ -198,4 +199,19 @@ public void deliveryComplete(IMqttDeliveryToken token) {
198199
// nor this manager concern
199200
}
200201

202+
@Override
203+
public boolean isConnected() {
204+
this.lock.lock();
205+
try {
206+
IMqttAsyncClient client = getClient();
207+
if (client != null) {
208+
return client.isConnected();
209+
}
210+
return false;
211+
}
212+
finally {
213+
this.lock.unlock();
214+
}
215+
216+
}
201217
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
* @author Artem Vozhdayenko
4141
* @author Artem Bilan
4242
* @author Christian Tzolov
43+
* @author Jiri Soucek
4344
*
4445
* @since 6.0
4546
*/
@@ -206,4 +207,18 @@ public void mqttErrorOccurred(MqttException exception) {
206207
logger.error("MQTT error occurred", exception);
207208
}
208209

210+
@Override
211+
public boolean isConnected() {
212+
this.lock.lock();
213+
try {
214+
IMqttAsyncClient client = getClient();
215+
if (client != null) {
216+
return client.isConnected();
217+
}
218+
return false;
219+
}
220+
finally {
221+
this.lock.unlock();
222+
}
223+
}
209224
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,6 +49,7 @@
4949
* @author Trung Pham
5050
* @author Mikhail Polivakha
5151
* @author Artem Vozhdayenko
52+
* @author Jiri Soucek
5253
*
5354
* @since 4.0
5455
*
@@ -203,6 +204,9 @@ protected void onInit() {
203204
super.onInit();
204205
if (this.clientManager != null) {
205206
this.clientManager.addCallback(this);
207+
if (this.clientManager.isConnected()) {
208+
connectComplete(false);
209+
}
206210
}
207211
}
208212

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@
2424
import org.eclipse.paho.client.mqttv3.MqttException;
2525
import org.junit.jupiter.api.Test;
2626

27+
import org.springframework.context.ApplicationContext;
2728
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
2829
import org.springframework.context.annotation.Bean;
2930
import org.springframework.context.annotation.Configuration;
3031
import org.springframework.context.event.EventListener;
3132
import org.springframework.integration.annotation.ServiceActivator;
33+
import org.springframework.integration.channel.QueueChannel;
3234
import org.springframework.integration.config.EnableIntegration;
3335
import org.springframework.integration.dsl.IntegrationFlow;
36+
import org.springframework.integration.dsl.context.IntegrationFlowContext;
37+
import org.springframework.integration.endpoint.MessageProducerSupport;
3438
import org.springframework.integration.mqtt.core.Mqttv3ClientManager;
3539
import org.springframework.integration.mqtt.core.Mqttv5ClientManager;
3640
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
@@ -70,12 +74,24 @@ void testV3ClientManagerReconnect() throws Exception {
7074
Mqttv3ConfigWithDisconnect.subscribedLatch);
7175
}
7276

77+
@Test
78+
void testV3ClientManagerRuntime() throws Exception {
79+
testSubscribeAndPublishRuntime(Mqttv3ConfigRuntime.class, Mqttv3ConfigRuntime.TOPIC_NAME,
80+
Mqttv3ConfigRuntime.subscribedLatch);
81+
}
82+
7383
@Test
7484
void testV5ClientManagerReconnect() throws Exception {
7585
testSubscribeAndPublish(Mqttv5ConfigWithDisconnect.class, Mqttv5ConfigWithDisconnect.TOPIC_NAME,
7686
Mqttv5ConfigWithDisconnect.subscribedLatch);
7787
}
7888

89+
@Test
90+
void testV5ClientManagerRuntime() throws Exception {
91+
testSubscribeAndPublishRuntime(Mqttv5ConfigRuntime.class, Mqttv5ConfigRuntime.TOPIC_NAME,
92+
Mqttv5ConfigRuntime.subscribedLatch);
93+
}
94+
7995
private void testSubscribeAndPublish(Class<?> configClass, String topicName, CountDownLatch subscribedLatch)
8096
throws Exception {
8197

@@ -102,6 +118,39 @@ private void testSubscribeAndPublish(Class<?> configClass, String topicName, Cou
102118
}
103119
}
104120

121+
private void testSubscribeAndPublishRuntime(Class<?> configClass, String topicName, CountDownLatch subscribedLatch)
122+
throws Exception {
123+
124+
try (var ctx = new AnnotationConfigApplicationContext(configClass)) {
125+
// given
126+
var input = ctx.getBean("mqttOutFlow.input", MessageChannel.class);
127+
var flowContext = ctx.getBean(IntegrationFlowContext.class);
128+
var factory = ctx.getBean(MessageDrivenChannelAdapterFactory.class);
129+
var output = new QueueChannel();
130+
131+
flowContext.registration(IntegrationFlow
132+
.from(factory.createMessageDrivenAdapter(ctx))
133+
.channel(output)
134+
.get()).register();
135+
String testPayload = "foo";
136+
assertThat(subscribedLatch.await(20, TimeUnit.SECONDS)).isTrue();
137+
138+
// when
139+
input.send(MessageBuilder.withPayload(testPayload).setHeader(MqttHeaders.TOPIC, topicName).build());
140+
Message<?> receive = output.receive(20_000);
141+
142+
// then
143+
assertThat(receive).isNotNull();
144+
Object payload = receive.getPayload();
145+
if (payload instanceof String sp) {
146+
assertThat(sp).isEqualTo(testPayload);
147+
}
148+
else {
149+
assertThat(payload).isEqualTo(testPayload.getBytes(StandardCharsets.UTF_8));
150+
}
151+
}
152+
}
153+
105154
@Configuration
106155
@EnableIntegration
107156
public static class Mqttv3Config {
@@ -177,6 +226,39 @@ public IntegrationFlow mqttInFlow(Mqttv3ClientManager mqttv3ClientManager) {
177226

178227
}
179228

229+
@Configuration
230+
@EnableIntegration
231+
public static class Mqttv3ConfigRuntime implements MessageDrivenChannelAdapterFactory {
232+
233+
static final String TOPIC_NAME = "test-topic-v3";
234+
235+
static final CountDownLatch subscribedLatch = new CountDownLatch(1);
236+
237+
@EventListener
238+
public void onSubscribed(MqttSubscribedEvent e) {
239+
subscribedLatch.countDown();
240+
}
241+
242+
@Bean
243+
public Mqttv3ClientManager mqttv3ClientManager() {
244+
MqttConnectOptions connectionOptions = new MqttConnectOptions();
245+
connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()});
246+
connectionOptions.setAutomaticReconnect(true);
247+
return new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3");
248+
}
249+
250+
@Bean
251+
public IntegrationFlow mqttOutFlow(Mqttv3ClientManager mqttv3ClientManager) {
252+
return f -> f.handle(new MqttPahoMessageHandler(mqttv3ClientManager));
253+
}
254+
255+
@Override
256+
public MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx) {
257+
var clientManager = ctx.getBean(Mqttv3ClientManager.class);
258+
return new MqttPahoMessageDrivenChannelAdapter(clientManager, TOPIC_NAME);
259+
}
260+
}
261+
180262
@Configuration
181263
@EnableIntegration
182264
public static class Mqttv5Config {
@@ -247,6 +329,41 @@ public IntegrationFlow mqttInFlow(Mqttv5ClientManager mqttv5ClientManager) {
247329

248330
}
249331

332+
@Configuration
333+
@EnableIntegration
334+
public static class Mqttv5ConfigRuntime implements MessageDrivenChannelAdapterFactory {
335+
336+
static final String TOPIC_NAME = "test-topic-v5";
337+
338+
static final CountDownLatch subscribedLatch = new CountDownLatch(1);
339+
340+
@EventListener
341+
public void onSubscribed(MqttSubscribedEvent e) {
342+
subscribedLatch.countDown();
343+
}
344+
345+
@Bean
346+
public Mqttv5ClientManager mqttv5ClientManager() {
347+
return new Mqttv5ClientManager(MosquittoContainerTest.mqttUrl(), "client-manager-client-id-v5");
348+
}
349+
350+
@Bean
351+
@ServiceActivator(inputChannel = "mqttOutFlow.input")
352+
public Mqttv5PahoMessageHandler mqttv5PahoMessageHandler(Mqttv5ClientManager mqttv5ClientManager) {
353+
return new Mqttv5PahoMessageHandler(mqttv5ClientManager);
354+
}
355+
356+
@Override
357+
public MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx) {
358+
var clientManager = ctx.getBean(Mqttv5ClientManager.class);
359+
return new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, TOPIC_NAME);
360+
}
361+
}
362+
363+
interface MessageDrivenChannelAdapterFactory {
364+
MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx);
365+
}
366+
250367
record ClientV3Disconnector(Mqttv3ClientManager clientManager) {
251368

252369
@EventListener(MqttSubscribedEvent.class)

src/reference/antora/modules/ROOT/pages/mqtt.adoc

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -382,9 +382,9 @@ public class MqttJavaApplication {
382382
.run(args);
383383
}
384384
385-
@Bean
386-
public IntegrationFlow mqttOutboundFlow() {
387-
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
385+
@Bean
386+
public IntegrationFlow mqttOutboundFlow() {
387+
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
388388
}
389389
390390
}
@@ -548,3 +548,19 @@ public IntegrationFlow mqttOutFlow(
548548
return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
549549
}
550550
----
551+
552+
NOTE: Starting with version 6.4, multiple instances of `MqttPahoMessageDrivenChannelAdapter` and `Mqttv5PahoMessageDrivenChannelAdapter` can now be added at runtime using corresponding `ClientManager` through `IntegrationFlowContext`
553+
554+
[source,java]
555+
----
556+
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
557+
String topic, MessageChannel channel) {
558+
flowContext
559+
.registration(
560+
IntegrationFlow
561+
.from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
562+
.channel(channel)
563+
.get())
564+
.register();
565+
}
566+
----

src/reference/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,16 @@ See xref:redis.adoc[Redis Support] for more information.
6161
The `ControlBusFactoryBean` (and respective `<int-groovy:control-bus>` XML tag) has been deprecated (for removal) in favor of new introduced `ControlBusFactoryBean` based on a new model implemented in the `ControlBusCommandRegistry`.
6262
See xref:control-bus.adoc[Control Bus] for more information.
6363

64+
6465
[[x6.4-sftp-changes]]
6566
=== SFTP Support Changes
6667

6768
The `DefaultSftpSessionFactory` now exposes a `Consumer<SshClient>` configurer property to further customize an internal `SshClient`.
68-
See xref:sftp/session-factory.adoc[SFTP Session Factory] for more information.
69+
See xref:sftp/session-factory.adoc[SFTP Session Factory] for more information.
70+
71+
[[x6.4-mqtt-support-changes]]
72+
=== MQTT Support Changes
73+
74+
Multiple instances of `MqttPahoMessageDrivenChannelAdapter` and `Mqttv5PahoMessageDrivenChannelAdapter` can now be added at runtime using corresponding `ClientManager` through `IntegrationFlowContext`
75+
See xref:mqtt.adoc[MQTT Support] for more information.
76+

0 commit comments

Comments
 (0)