Skip to content

Commit 52663b6

Browse files
SheheryarAamirSheheryar
andauthored
fix: updated CloudStreamFunctionChannelsScanner scan to merge channel (#416)
fixes #415 Co-authored-by: Sheheryar <[email protected]>
1 parent 765ecd3 commit 52663b6

File tree

2 files changed

+62
-4
lines changed

2 files changed

+62
-4
lines changed

springwolf-plugins/springwolf-cloud-stream-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/cloudstream/CloudStreamFunctionChannelsScanner.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.asyncapi.v2._6_0.model.server.Server;
77
import com.asyncapi.v2.binding.message.MessageBinding;
88
import io.github.stavshamir.springwolf.asyncapi.scanners.beans.BeanMethodsScanner;
9+
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelMerger;
910
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
1011
import io.github.stavshamir.springwolf.asyncapi.types.channel.bindings.EmptyChannelBinding;
1112
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.bindings.EmptyOperationBinding;
@@ -24,8 +25,7 @@
2425
import java.lang.reflect.Method;
2526
import java.util.Map;
2627
import java.util.Set;
27-
28-
import static java.util.stream.Collectors.toMap;
28+
import java.util.stream.Collectors;
2929

3030
@Slf4j
3131
@RequiredArgsConstructor
@@ -39,12 +39,12 @@ public class CloudStreamFunctionChannelsScanner implements ChannelsScanner {
3939
@Override
4040
public Map<String, ChannelItem> scan() {
4141
Set<Method> beanMethods = beanMethodsScanner.getBeanMethods();
42-
return beanMethods.stream()
42+
return ChannelMerger.merge(beanMethods.stream()
4343
.map(FunctionalChannelBeanData::fromMethodBean)
4444
.flatMap(Set::stream)
4545
.filter(this::isChannelBean)
4646
.map(this::toChannelEntry)
47-
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
47+
.collect(Collectors.toList()));
4848
}
4949

5050
private boolean isChannelBean(FunctionalChannelBeanData beanData) {

springwolf-plugins/springwolf-cloud-stream-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/cloudstream/CloudStreamFunctionChannelsScannerIntegrationTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,64 @@ void testKStreamFunctionBinding() {
271271
.contains(Map.entry(inputTopicName, publishChannel), Map.entry(outputTopicName, subscribeChannel));
272272
}
273273

274+
@Test
275+
void testFunctionBindingWithSameTopicName() {
276+
// Given a binding "spring.cloud.stream.bindings.testFunction-in-0.destination=test-topic"
277+
// And a binding "spring.cloud.stream.bindings.testFunction-out-0.destination=test-topic"
278+
String topicName = "test-topic";
279+
BindingProperties testFunctionInBinding = new BindingProperties();
280+
testFunctionInBinding.setDestination(topicName);
281+
282+
BindingProperties testFunctionOutBinding = new BindingProperties();
283+
testFunctionOutBinding.setDestination(topicName);
284+
when(bindingServiceProperties.getBindings())
285+
.thenReturn(Map.of(
286+
"testFunction-in-0", testFunctionInBinding,
287+
"testFunction-out-0", testFunctionOutBinding));
288+
289+
// When scan is called
290+
Map<String, ChannelItem> channels = scanner.scan();
291+
292+
// Then the returned merged channels contain a publish operation and a subscribe operation
293+
Message subscribeMessage = Message.builder()
294+
.name(Integer.class.getName())
295+
.title(Integer.class.getSimpleName())
296+
.payload(PayloadReference.fromModelName(Integer.class.getSimpleName()))
297+
.headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName()))
298+
.bindings(messageBinding)
299+
.build();
300+
301+
Operation subscribeOperation = Operation.builder()
302+
.bindings(operationBinding)
303+
.description("Auto-generated description")
304+
.operationId("test-topic_subscribe_testFunction")
305+
.message(subscribeMessage)
306+
.build();
307+
308+
Message publishMessage = Message.builder()
309+
.name(String.class.getName())
310+
.title(String.class.getSimpleName())
311+
.payload(PayloadReference.fromModelName(String.class.getSimpleName()))
312+
.headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName()))
313+
.bindings(messageBinding)
314+
.build();
315+
316+
Operation publishOperation = Operation.builder()
317+
.bindings(operationBinding)
318+
.description("Auto-generated description")
319+
.operationId("test-topic_publish_testFunction")
320+
.message(publishMessage)
321+
.build();
322+
323+
ChannelItem mergedChannel = ChannelItem.builder()
324+
.bindings(channelBinding)
325+
.publish(publishOperation)
326+
.subscribe(subscribeOperation)
327+
.build();
328+
329+
assertThat(channels).contains(Map.entry(topicName, mergedChannel));
330+
}
331+
274332
@TestConfiguration
275333
public static class Configuration {
276334

0 commit comments

Comments
 (0)