16
16
17
17
package org .springframework .integration .mqtt ;
18
18
19
+ import java .lang .reflect .Constructor ;
19
20
import java .nio .charset .StandardCharsets ;
21
+ import java .util .Arrays ;
20
22
import java .util .concurrent .CountDownLatch ;
21
23
import java .util .concurrent .TimeUnit ;
22
24
29
31
import org .springframework .context .annotation .Configuration ;
30
32
import org .springframework .context .event .EventListener ;
31
33
import org .springframework .integration .annotation .ServiceActivator ;
34
+ import org .springframework .integration .channel .QueueChannel ;
32
35
import org .springframework .integration .config .EnableIntegration ;
33
36
import org .springframework .integration .dsl .IntegrationFlow ;
37
+ import org .springframework .integration .dsl .context .IntegrationFlowContext ;
38
+ import org .springframework .integration .endpoint .MessageProducerSupport ;
39
+ import org .springframework .integration .mqtt .core .ClientManager ;
34
40
import org .springframework .integration .mqtt .core .Mqttv3ClientManager ;
35
41
import org .springframework .integration .mqtt .core .Mqttv5ClientManager ;
36
42
import org .springframework .integration .mqtt .event .MqttSubscribedEvent ;
@@ -76,6 +82,12 @@ void testV3ClientManagerStarted() throws Exception {
76
82
Mqttv3ConfigWithStartedManager .subscribedLatch );
77
83
}
78
84
85
+ @ Test
86
+ void testV3ClientManagerRuntime () throws Exception {
87
+ testSubscribeAndPublishRuntime (Mqttv3ConfigRuntime .class , Mqttv3ConfigRuntime .TOPIC_NAME ,
88
+ Mqttv3ConfigRuntime .subscribedLatch , Mqttv3ConfigRuntime .adapter );
89
+ }
90
+
79
91
@ Test
80
92
void testV5ClientManagerReconnect () throws Exception {
81
93
testSubscribeAndPublish (Mqttv5ConfigWithDisconnect .class , Mqttv5ConfigWithDisconnect .TOPIC_NAME ,
@@ -88,6 +100,12 @@ void testV5ClientManagerStarted() throws Exception {
88
100
Mqttv5ConfigWithStartedManager .subscribedLatch );
89
101
}
90
102
103
+ @ Test
104
+ void testV5ClientManagerRuntime () throws Exception {
105
+ testSubscribeAndPublishRuntime (Mqttv5ConfigRuntime .class , Mqttv5ConfigRuntime .TOPIC_NAME ,
106
+ Mqttv5ConfigRuntime .subscribedLatch , Mqttv5ConfigRuntime .adapter );
107
+ }
108
+
91
109
private void testSubscribeAndPublish (Class <?> configClass , String topicName , CountDownLatch subscribedLatch )
92
110
throws Exception {
93
111
@@ -114,6 +132,40 @@ private void testSubscribeAndPublish(Class<?> configClass, String topicName, Cou
114
132
}
115
133
}
116
134
135
+ private void testSubscribeAndPublishRuntime (Class <?> configClass , String topicName , CountDownLatch subscribedLatch , Class <?> adapter )
136
+ throws Exception {
137
+
138
+ try (var ctx = new AnnotationConfigApplicationContext (configClass )) {
139
+ // given
140
+ var input = ctx .getBean ("mqttOutFlow.input" , MessageChannel .class );
141
+ var flowContext = ctx .getBean (IntegrationFlowContext .class );
142
+ var clientManager = ctx .getBean (ClientManager .class );
143
+ var output = new QueueChannel ();
144
+ Class <?>[] parameterTypes = {ClientManager .class , String [].class };
145
+ Constructor <?> declaredConstructor = adapter .getConstructor (parameterTypes );
146
+ flowContext .registration (IntegrationFlow
147
+ .from ((MessageProducerSupport ) declaredConstructor .newInstance (clientManager ,new String [] {topicName }))
148
+ .channel (output )
149
+ .get ()).register ();
150
+ String testPayload = "foo" ;
151
+ assertThat (subscribedLatch .await (20 , TimeUnit .SECONDS )).isTrue ();
152
+
153
+ // when
154
+ input .send (MessageBuilder .withPayload (testPayload ).setHeader (MqttHeaders .TOPIC , topicName ).build ());
155
+ Message <?> receive = output .receive (20_000 );
156
+
157
+ // then
158
+ assertThat (receive ).isNotNull ();
159
+ Object payload = receive .getPayload ();
160
+ if (payload instanceof String sp ) {
161
+ assertThat (sp ).isEqualTo (testPayload );
162
+ }
163
+ else {
164
+ assertThat (payload ).isEqualTo (testPayload .getBytes (StandardCharsets .UTF_8 ));
165
+ }
166
+ }
167
+ }
168
+
117
169
@ Configuration
118
170
@ EnableIntegration
119
171
public static class Mqttv3Config {
@@ -208,6 +260,7 @@ public Mqttv3ClientManager mqttv3ClientManager() {
208
260
connectionOptions .setServerURIs (new String [] {MosquittoContainerTest .mqttUrl ()});
209
261
connectionOptions .setAutomaticReconnect (true );
210
262
Mqttv3ClientManager manager = new Mqttv3ClientManager (connectionOptions , "client-manager-client-id-v3" );
263
+ manager .start ();
211
264
return manager ;
212
265
}
213
266
@@ -223,6 +276,35 @@ public IntegrationFlow mqttInFlow(Mqttv3ClientManager mqttv3ClientManager) {
223
276
.get ();
224
277
}
225
278
279
+ }
280
+ @ Configuration
281
+ @ EnableIntegration
282
+ public static class Mqttv3ConfigRuntime {
283
+
284
+ static final String TOPIC_NAME = "test-topic-v3" ;
285
+
286
+ static final CountDownLatch subscribedLatch = new CountDownLatch (1 );
287
+
288
+ static final Class <?> adapter = MqttPahoMessageDrivenChannelAdapter .class ;
289
+
290
+ @ EventListener
291
+ public void onSubscribed (MqttSubscribedEvent e ) {
292
+ subscribedLatch .countDown ();
293
+ }
294
+
295
+ @ Bean
296
+ public Mqttv3ClientManager mqttv3ClientManager () {
297
+ MqttConnectOptions connectionOptions = new MqttConnectOptions ();
298
+ connectionOptions .setServerURIs (new String [] {MosquittoContainerTest .mqttUrl ()});
299
+ connectionOptions .setAutomaticReconnect (true );
300
+ return new Mqttv3ClientManager (connectionOptions , "client-manager-client-id-v3" );
301
+ }
302
+
303
+ @ Bean
304
+ public IntegrationFlow mqttOutFlow (Mqttv3ClientManager mqttv3ClientManager ) {
305
+ return f -> f .handle (new MqttPahoMessageHandler (mqttv3ClientManager ));
306
+ }
307
+
226
308
}
227
309
228
310
@ Configuration
@@ -328,6 +410,33 @@ public IntegrationFlow mqttInFlow(Mqttv5ClientManager mqttv5ClientManager) {
328
410
.get ();
329
411
}
330
412
413
+ }
414
+ @ Configuration
415
+ @ EnableIntegration
416
+ public static class Mqttv5ConfigRuntime {
417
+
418
+ static final String TOPIC_NAME = "test-topic-v5" ;
419
+
420
+ static final CountDownLatch subscribedLatch = new CountDownLatch (1 );
421
+
422
+ static final Class <?> adapter = Mqttv5PahoMessageDrivenChannelAdapter .class ;
423
+
424
+ @ EventListener
425
+ public void onSubscribed (MqttSubscribedEvent e ) {
426
+ subscribedLatch .countDown ();
427
+ }
428
+
429
+ @ Bean
430
+ public Mqttv5ClientManager mqttv5ClientManager () {
431
+ return new Mqttv5ClientManager (MosquittoContainerTest .mqttUrl (), "client-manager-client-id-v5" );
432
+ }
433
+
434
+ @ Bean
435
+ @ ServiceActivator (inputChannel = "mqttOutFlow.input" )
436
+ public Mqttv5PahoMessageHandler mqttv5PahoMessageHandler (Mqttv5ClientManager mqttv5ClientManager ) {
437
+ return new Mqttv5PahoMessageHandler (mqttv5ClientManager );
438
+ }
439
+
331
440
}
332
441
333
442
record ClientV3Disconnector (Mqttv3ClientManager clientManager ) {
0 commit comments