Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,14 @@ public class MessageObject extends ExtendableObject implements Message {
*/
@JsonProperty(value = "traits")
private List<MessageTrait> traits;

/*
* Override the getMessageId to guarantee that there's always a value. Defaults to 'name'
*/
public String getMessageId() {
if (messageId == null) {
return this.name;
}
return messageId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@ public String getRef() {
/**
* Convenient Builder to create a Message reference to an existing Message
* @param message Message to create the reference to. This Message MUST have a 'messageId' field
* @return a Message with the 'ref' field pointing to "#/components/messages/{messageId"
* @return a Message with the 'ref' field pointing to "#/components/messages/{messageName}"
*/
public static MessageReference fromMessage(MessageObject message) {
var messageId = message.getMessageId();
if (messageId == null) {
throw new IllegalArgumentException("The message must have a 'messageId' defined");
}
return new MessageReference("#/components/messages/" + messageId);
return fromMessage(message.getName());
}

public static MessageReference fromMessage(String messageName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void shouldCreateSimpleAsyncAPI() throws IOException {
var channelUserSignedup = ChannelObject.builder()
.channelId("userSignedup")
.address("user/signedup")
.messages(Map.of(userSignUpMessage.getMessageId(), MessageReference.fromMessage(userSignUpMessage)))
.messages(Map.of(userSignUpMessage.getMessageId(), MessageReference.fromMessage("UserSignedUp")))
.build();

AsyncAPI asyncAPI = AsyncAPI.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package io.github.stavshamir.springwolf.asyncapi;

import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.operation.Operation;

import java.util.Map;

Expand All @@ -11,9 +12,16 @@
public interface ChannelsService {

/**
* Detects all available AsyncAPI ChannelItem in the spring context.
* Detects all available AsyncAPI ChannelObject in the spring context.
*
* @return Map of channel names mapping to detected ChannelItems
*/
Map<String, ChannelObject> findChannels();

/**
* Detects all available AsyncAPI Operation in the spring context.
*
* @return Map of operation names mapping to detected Operations
*/
Map<String, Operation> findOperations();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.github.stavshamir.springwolf.asyncapi.types.AsyncAPI;
import io.github.stavshamir.springwolf.asyncapi.types.Components;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.operation.Operation;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.schemas.SchemasService;
Expand Down Expand Up @@ -64,6 +65,8 @@ protected synchronized void initAsyncAPI() {
// SchemasService.
Map<String, ChannelObject> channels = channelsService.findChannels();

Map<String, Operation> operations = channelsService.findOperations();

Components components = Components.builder()
.schemas(schemasService.getDefinitions())
.build();
Expand All @@ -74,6 +77,7 @@ protected synchronized void initAsyncAPI() {
.defaultContentType(docket.getDefaultContentType())
.servers(docket.getServers())
.channels(channels)
.operations(operations)
.components(components)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelMerger;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.operation.Operation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -31,12 +32,28 @@ public Map<String, ChannelObject> findChannels() {

for (ChannelsScanner scanner : channelsScanners) {
try {
Map<String, ChannelObject> channels = scanner.scan();
Map<String, ChannelObject> channels = scanner.scanChannels();
foundChannelItems.addAll(channels.entrySet());
} catch (Exception e) {
log.error("An error was encountered during channel scanning with {}: {}", scanner, e.getMessage(), e);
}
}
return ChannelMerger.merge(foundChannelItems);
return ChannelMerger.mergeChannels(foundChannelItems);
}

// FIXME
@Override
public Map<String, Operation> findOperations() {
List<Map.Entry<String, Operation>> foundOperations = new ArrayList<>();
for (ChannelsScanner scanner : channelsScanners) {
try {
Map<String, Operation> channels = scanner.scanOperations();
foundOperations.addAll(channels.entrySet());
} catch (Exception e) {
log.error("An error was encountered during operation scanning with {}: {}", scanner, e.getMessage(), e);
}
}

return ChannelMerger.mergeOperations(foundOperations);
}
}
Original file line number Diff line number Diff line change
@@ -1,51 +1,40 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.stavshamir.springwolf.asyncapi;

import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.Message;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.MessageObject;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

@Slf4j
public class MessageHelper {
private static final String ONE_OF = "oneOf";

private static final Comparator<MessageObject> byMessageName = Comparator.comparing(MessageObject::getName);

private static final Supplier<Set<MessageObject>> messageSupplier = () -> new TreeSet<>(byMessageName);

public static Object toMessageObjectOrComposition(Set<MessageObject> messages) {
return switch (messages.size()) {
case 0 -> throw new IllegalArgumentException("messages must not be empty");
case 1 -> messages.toArray()[0];
default -> Map.of(
ONE_OF, new ArrayList<>(messages.stream().collect(Collectors.toCollection(messageSupplier))));
};
}
private MessageHelper() {}

@SuppressWarnings("unchecked")
public static Set<MessageObject> messageObjectToSet(Object messageObject) {
if (messageObject instanceof MessageObject message) {
return new HashSet<>(Collections.singletonList(message));
public static Map<String, Message> toMessagesMap(Set<MessageObject> messages) {
if (messages.isEmpty()) {
throw new IllegalArgumentException("messages must not be empty");
}

if (messageObject instanceof Map) {
List<MessageObject> messages = ((Map<String, List<MessageObject>>) messageObject).get(ONE_OF);
return new HashSet<>(messages);
}
return new ArrayList<>(messages.stream().collect(Collectors.toCollection(messageSupplier)))
.stream().collect(Collectors.toMap(MessageObject::getMessageId, Function.identity()));
}

log.warn(
"Message object must contain either a Message or a Map<String, Set<Message>, but contained: {}",
messageObject.getClass());
return new HashSet<>();
// FIXME: Do we need this method?
@SuppressWarnings("unchecked")
public static Set<Message> messageObjectToSet(Map<String, Message> messages) {
return new HashSet<>(messages.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import io.github.stavshamir.springwolf.asyncapi.MessageHelper;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.Channel;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.Message;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.MessageObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.MessageReference;
import io.github.stavshamir.springwolf.asyncapi.v3.model.operation.Operation;

import java.util.HashMap;
Expand All @@ -16,6 +18,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessagesMap;

/**
* Util to merge multiple {@link Channel}s
*/
Expand All @@ -27,52 +31,103 @@ private ChannelMerger() {}
* Merges multiple channels by channel name
* <p>
* Given two channels for the same channel name, the first seen Channel is used
* If an operation is null, the next non-null operation is used
* Messages within operations are merged
* Messages within channels are merged
*
* @param channelEntries Ordered pairs of channel name to Channel
* @return A map of channelName to a single Channel
*/
public static Map<String, ChannelObject> merge(List<Map.Entry<String, ChannelObject>> channelEntries) {
public static Map<String, ChannelObject> mergeChannels(List<Map.Entry<String, ChannelObject>> channelEntries) {
Map<String, ChannelObject> mergedChannels = new HashMap<>();

for (Map.Entry<String, ChannelObject> entry : channelEntries) {
if (!mergedChannels.containsKey(entry.getKey())) {
mergedChannels.put(entry.getKey(), entry.getValue());
} else {
ChannelObject channel = mergedChannels.get(entry.getKey());
// channel.setPublish(mergeOperation(channel.getPublish(), entry.getValue().getPublish()));
// channel.setSubscribe(mergeOperation(channel.getSubscribe(), entry.getValue().getSubscribe()));
ChannelObject channel = mergeChannel(mergedChannels.get(entry.getKey()), entry.getValue());
mergedChannels.put(entry.getKey(), channel);
}
}

return mergedChannels;
}

/**
* Merges multiple operations by operation name
* <p>
* Given two operations for the same operation name, the first seen Operation is used
* If an operation is null, the next non-null operation is used
* Messages within operations are merged
*
* @param operationEntries Ordered pairs of operation name to Operation
* @return A map of operationName to a single Operation
*/
public static Map<String, Operation> mergeOperations(List<Map.Entry<String, Operation>> operationEntries) {
Map<String, Operation> mergedOperations = new HashMap<>();

for (Map.Entry<String, Operation> entry : operationEntries) {
if (!mergedOperations.containsKey(entry.getKey())) {
mergedOperations.put(entry.getKey(), entry.getValue());
} else {
Operation operation = mergeOperation(mergedOperations.get(entry.getKey()), entry.getValue());
mergedOperations.put(entry.getKey(), operation);
}
}

return mergedOperations;
}

private static ChannelObject mergeChannel(ChannelObject channel, ChannelObject otherChannel) {
ChannelObject mergedChannel = channel != null ? channel : otherChannel;

Set<MessageObject> mergedMessages = mergeMessages(getMessages(channel), getMessages(otherChannel));
if (!mergedMessages.isEmpty()) {
mergedChannel.setMessages(toMessagesMap(mergedMessages));
}
return mergedChannel;
}

private static Operation mergeOperation(Operation operation, Operation otherOperation) {
Operation mergedOperation = operation != null ? operation : otherOperation;

Set<MessageObject> mergedMessages = mergeMessages(getMessages(operation), getMessages(otherOperation));
// if (!mergedMessages.isEmpty()) {
// mergedOperation.setMessage(toMessageObjectOrComposition(mergedMessages)); FIXME
// }
List<MessageReference> mergedMessages =
mergeMessageReferences(operation.getMessages(), otherOperation.getMessages());
if (!mergedMessages.isEmpty()) {
mergedOperation.setMessages(mergedMessages);
}
return mergedOperation;
}

private static Set<MessageObject> mergeMessages(Set<MessageObject> messages, Set<MessageObject> otherMessages) {
Map<String, MessageObject> nameToMessage =
messages.stream().collect(Collectors.toMap(MessageObject::getName, Function.identity()));
private static Set<MessageObject> mergeMessages(Set<Message> messages, Set<Message> otherMessages) {
// FIXME: We will lose any MessageReference we get
Map<String, MessageObject> nameToMessage = messages.stream()
.filter(MessageObject.class::isInstance)
.map(MessageObject.class::cast)
.collect(Collectors.toMap(MessageObject::getName, Function.identity()));

for (MessageObject otherMessage : otherMessages) {
nameToMessage.putIfAbsent(otherMessage.getName(), otherMessage);
for (Message otherMessage : otherMessages) {
if (otherMessage instanceof MessageObject otherMessageObject) {
nameToMessage.putIfAbsent(otherMessageObject.getName(), otherMessageObject);
}
}

return new HashSet<>(nameToMessage.values());
}

private static Set<MessageObject> getMessages(Operation operation) {
return Optional.ofNullable(operation)
.map(Operation::getMessages)
private static List<MessageReference> mergeMessageReferences(
List<MessageReference> messages, List<MessageReference> otherMessages) {
var messageReferences = new HashSet<MessageReference>();
if (messages != null) {
messageReferences.addAll(messages);
}
if (otherMessages != null) {
messageReferences.addAll(otherMessages);
}
return messageReferences.stream().toList();
}

private static Set<Message> getMessages(ChannelObject channel) {
return Optional.ofNullable(channel)
.map(ChannelObject::getMessages)
.map(MessageHelper::messageObjectToSet)
.orElseGet(HashSet::new);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;

import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.operation.Operation;

import java.util.Map;

Expand All @@ -10,5 +11,10 @@ public interface ChannelsScanner {
/**
* @return A mapping of channel names to their respective channel object for a given protocol.
*/
Map<String, ChannelObject> scan();
Map<String, ChannelObject> scanChannels();

/**
* @return A mapping of operation names to their respective operation object for a given protocol.
*/
Map<String, Operation> scanOperations();
}
Loading