diff --git a/api/src/main/resources/schema/workflow.yaml b/api/src/main/resources/schema/workflow.yaml index daaedc41..1cceff60 100644 --- a/api/src/main/resources/schema/workflow.yaml +++ b/api/src/main/resources/schema/workflow.yaml @@ -558,7 +558,17 @@ $defs: $ref: '#/$defs/eventConsumptionStrategy' title: ListenTo description: Defines the event(s) to listen to. + read: + type: string + enum: [ data, envelope, raw ] + default: data + title: ListenAndReadAs + description: Specifies how events are read during the listen operation. required: [ to ] + foreach: + $ref: '#/$defs/subscriptionIterator' + title: ListenIterator + description: Configures the iterator, if any, for processing consumed event(s). raiseTask: type: object $ref: '#/$defs/taskBase' @@ -1315,47 +1325,25 @@ $defs: $ref: '#/$defs/eventFilter' required: [ all ] - title: AnyEventConsumptionStrategy - oneOf: - - properties: - any: - type: array - title: AnyEventConsumptionStrategyConfiguration - description: A list containing any of the events to consume. - items: - $ref: '#/$defs/eventFilter' - minItems: 1 - until: - oneOf: - - type: string - title: AnyEventUntilCondition - description: A runtime expression condition evaluated after consuming an event and which determines whether or not to continue listening. - - allOf: - - $ref: '#/$defs/eventConsumptionStrategy' - title: AnyEventUntilConsumed - description: The strategy that defines the event(s) to consume to stop listening. - - properties: - until: false - required: [ any ] - - properties: - any: - type: array - title: AnyEventConsumptionStrategyConfiguration - description: A list containing any of the events to consume. - items: - $ref: '#/$defs/eventFilter' - maxItems: 0 - until: - oneOf: - - type: string - title: AnyEventUntilCondition - description: A runtime expression condition evaluated after consuming an event and which determines whether or not to continue listening. - - allOf: - - $ref: '#/$defs/eventConsumptionStrategy' - title: AnyEventUntilConsumed - description: The strategy that defines the event(s) to consume to stop listening. - - properties: - until: false - required: [ any, until ] + properties: + any: + type: array + title: AnyEventConsumptionStrategyConfiguration + description: A list containing any of the events to consume. + items: + $ref: '#/$defs/eventFilter' + until: + oneOf: + - type: string + title: AnyEventUntilCondition + description: A runtime expression condition evaluated after consuming an event and which determines whether or not to continue listening. + - allOf: + - $ref: '#/$defs/eventConsumptionStrategy' + description: The strategy that defines the event(s) to consume to stop listening. + - properties: + until: false + title: AnyEventUntilConsumed + required: [ any ] - title: OneEventConsumptionStrategy properties: one: @@ -1710,6 +1698,10 @@ $defs: $ref: '#/$defs/asyncApiMessageConsumptionPolicy' title: AsyncApiMessageConsumptionPolicy description: An object used to configure the subscription's message consumption policy. + foreach: + $ref: '#/$defs/subscriptionIterator' + title: AsyncApiSubscriptionIterator + description: Configures the iterator, if any, for processing consumed messages(s). required: [ consume ] asyncApiMessageConsumptionPolicy: type: object @@ -1740,3 +1732,31 @@ $defs: title: AsyncApiMessageConsumptionPolicyUntil description: A runtime expression evaluated before each consumed (filtered) message to decide if message consumption should continue. required: [ until ] + subscriptionIterator: + type: object + title: SubscriptionIterator + description: Configures the iteration over each item (event or message) consumed by a subscription. + unevaluatedProperties: false + properties: + item: + type: string + title: SubscriptionIteratorItem + description: The name of the variable used to store the current item being enumerated. + default: item + at: + type: string + title: SubscriptionIteratorIndex + description: The name of the variable used to store the index of the current item being enumerated. + default: index + do: + $ref: '#/$defs/taskList' + title: SubscriptionIteratorTasks + description: The tasks to perform for each consumed item. + output: + $ref: '#/$defs/output' + title: SubscriptionIteratorOutput + description: An object, if any, used to customize the item's output and to document its schema. + export: + $ref: '#/$defs/export' + title: SubscriptionIteratorExport + description: An object, if any, used to customize the content of the workflow context. \ No newline at end of file diff --git a/custom-generator/src/main/java/io/serverlessworkflow/generator/AllAnyOneOfSchemaRule.java b/custom-generator/src/main/java/io/serverlessworkflow/generator/AllAnyOneOfSchemaRule.java index 2fa343a0..622efcbb 100644 --- a/custom-generator/src/main/java/io/serverlessworkflow/generator/AllAnyOneOfSchemaRule.java +++ b/custom-generator/src/main/java/io/serverlessworkflow/generator/AllAnyOneOfSchemaRule.java @@ -60,6 +60,7 @@ class AllAnyOneOfSchemaRule extends SchemaRule { } private static final String REF = "$ref"; + private static final String TITLE = "title"; private static final String PATTERN = "pattern"; private enum Format { @@ -154,6 +155,16 @@ public JType apply( && allOfTypes.isEmpty() && refType.isPresent()) { javaType = refType.get(); + } else if (!schemaNode.has("properties") + && oneOfTypes.isEmpty() + && allOfTypes.size() == 1 + && refType.isEmpty()) { + javaType = allOfTypes.get(0).getType(); + } else if (!schemaNode.has("properties") + && oneOfTypes.size() == 1 + && allOfTypes.isEmpty() + && refType.isEmpty()) { + javaType = oneOfTypes.get(0).getType(); } else { JPackage container = generatableType.getPackage(); javaType = ruleFactory.getTypeRule().apply(nodeName, schemaNode, parent, container, schema); @@ -469,6 +480,9 @@ private void unionType( Collection types) { if (schemaNode.has(prefix)) { ArrayNode array = (ArrayNode) schemaNode.get(prefix); + if (schemaNode.has(TITLE)) { + nodeName = schemaNode.get(TITLE).asText(); + } int i = 0; for (JsonNode oneOf : array) { if (!ignoreNode(oneOf)) { @@ -491,6 +505,23 @@ private void unionType( } private static boolean ignoreNode(JsonNode node) { + return allRequired(node) || allRemoveProperties(node); + } + + private static boolean allRemoveProperties(JsonNode node) { + if (node.size() == 1 && node.has("properties")) { + JsonNode propsNode = node.get("properties"); + for (JsonNode propNode : propsNode) { + if (!propNode.isBoolean() || propNode.asBoolean()) { + return false; + } + } + return true; + } + return false; + } + + private static boolean allRequired(JsonNode node) { return node.size() == 1 && node.has("required"); } @@ -514,7 +545,7 @@ private Optional refType( schema.isGenerated() ? schema.getJavaType() : apply( - nameFromRef(ref, nodeName), + nameFromRef(ref, nodeName, schemaNode), schema.getContent(), parent, generatableType, @@ -556,7 +587,10 @@ private String pattern(JsonNode node) { return format != null ? format.pattern() : getFromNode(node, PATTERN); } - private String nameFromRef(String ref, String nodeName) { + private String nameFromRef(String ref, String nodeName, JsonNode schemaNode) { + if (schemaNode.has(TITLE)) { + return schemaNode.get(TITLE).asText(); + } if ("#".equals(ref)) { return nodeName; } diff --git a/custom-generator/src/main/java/io/serverlessworkflow/generator/RefNameHelper.java b/custom-generator/src/main/java/io/serverlessworkflow/generator/RefNameHelper.java new file mode 100644 index 00000000..6411e886 --- /dev/null +++ b/custom-generator/src/main/java/io/serverlessworkflow/generator/RefNameHelper.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.generator; + +import com.fasterxml.jackson.databind.JsonNode; +import com.sun.codemodel.JClassAlreadyExistsException; +import com.sun.codemodel.JDefinedClass; +import com.sun.codemodel.JPackage; +import org.jsonschema2pojo.GenerationConfig; +import org.jsonschema2pojo.util.NameHelper; + +public class RefNameHelper extends NameHelper { + + public RefNameHelper(GenerationConfig generationConfig) { + super(generationConfig); + } + + @Override + public String getUniqueClassName(String nodeName, JsonNode node, JPackage _package) { + String className = getClassName(nodeName, node, _package); + try { + JDefinedClass _class = _package._class(className); + _package.remove(_class); + return className; + } catch (JClassAlreadyExistsException ex) { + return super.getUniqueClassName(nodeName, null, _package); + } + } +} diff --git a/custom-generator/src/main/java/io/serverlessworkflow/generator/UnreferencedFactory.java b/custom-generator/src/main/java/io/serverlessworkflow/generator/UnreferencedFactory.java index 01263033..f101fb8d 100644 --- a/custom-generator/src/main/java/io/serverlessworkflow/generator/UnreferencedFactory.java +++ b/custom-generator/src/main/java/io/serverlessworkflow/generator/UnreferencedFactory.java @@ -18,10 +18,25 @@ import com.sun.codemodel.JClassContainer; import com.sun.codemodel.JDefinedClass; import com.sun.codemodel.JType; +import org.jsonschema2pojo.GenerationConfig; import org.jsonschema2pojo.rules.Rule; import org.jsonschema2pojo.rules.RuleFactory; +import org.jsonschema2pojo.util.NameHelper; public class UnreferencedFactory extends RuleFactory { + + private NameHelper refNameHelper; + + public UnreferencedFactory() { + this.refNameHelper = new RefNameHelper(getGenerationConfig()); + } + + @Override + public void setGenerationConfig(final GenerationConfig generationConfig) { + super.setGenerationConfig(generationConfig); + this.refNameHelper = new RefNameHelper(generationConfig); + } + @Override public Rule getSchemaRule() { return new AllAnyOneOfSchemaRule(this); @@ -36,4 +51,9 @@ public Rule getTypeRule() { public Rule getAdditionalPropertiesRule() { return new UnevaluatedPropertiesRule(this); } + + @Override + public NameHelper getNameHelper() { + return refNameHelper; + } } diff --git a/impl/core/pom.xml b/impl/core/pom.xml index a1f2d692..c36c50d7 100644 --- a/impl/core/pom.xml +++ b/impl/core/pom.xml @@ -6,20 +6,23 @@ 7.0.0-SNAPSHOT serverlessworkflow-impl-core - - 1.2.0 - 5.2.3 - io.serverlessworkflow serverlessworkflow-api - 7.0.0-SNAPSHOT + ${project.version} + + + io.cloudevents + cloudevents-api + + + io.cloudevents + cloudevents-json-jackson com.github.f4b6a3 ulid-creator - ${version.com.github.f4b6a3} com.networknt @@ -28,7 +31,6 @@ net.thisptr jackson-jq - ${version.net.thisptr} org.junit.jupiter diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/ExpressionHolder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/ExpressionHolder.java new file mode 100644 index 00000000..f899f186 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/ExpressionHolder.java @@ -0,0 +1,20 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.util.function.BiFunction; + +public interface ExpressionHolder extends BiFunction {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java b/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java index ec52d251..cf5598e7 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java @@ -15,7 +15,5 @@ */ package io.serverlessworkflow.impl; -import java.util.function.BiFunction; - @FunctionalInterface -public interface LongFilter extends BiFunction {} +public interface LongFilter extends ExpressionHolder {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/StringFilter.java b/impl/core/src/main/java/io/serverlessworkflow/impl/StringFilter.java index 3ededc3f..2fbec647 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/StringFilter.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/StringFilter.java @@ -15,7 +15,5 @@ */ package io.serverlessworkflow.impl; -import java.util.function.BiFunction; - @FunctionalInterface -public interface StringFilter extends BiFunction {} +public interface StringFilter extends ExpressionHolder {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index f36c23f6..23597057 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -18,6 +18,9 @@ import com.github.f4b6a3.ulid.UlidCreator; import io.serverlessworkflow.api.types.Document; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.events.EventConsumer; +import io.serverlessworkflow.impl.events.EventPublisher; +import io.serverlessworkflow.impl.events.InMemoryEvents; import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory; import io.serverlessworkflow.impl.executors.TaskExecutorFactory; import io.serverlessworkflow.impl.expressions.ExpressionFactory; @@ -47,29 +50,24 @@ public class WorkflowApplication implements AutoCloseable { private final WorkflowPositionFactory positionFactory; private final ExecutorServiceFactory executorFactory; private final RuntimeDescriptorFactory runtimeDescriptorFactory; + private final EventConsumer eventConsumer; + private final EventPublisher eventPublisher; private ExecutorService executorService; - public WorkflowApplication( - TaskExecutorFactory taskFactory, - ExpressionFactory exprFactory, - ResourceLoaderFactory resourceLoaderFactory, - SchemaValidatorFactory schemaValidatorFactory, - WorkflowPositionFactory positionFactory, - WorkflowIdFactory idFactory, - RuntimeDescriptorFactory runtimeDescriptorFactory, - ExecutorServiceFactory executorFactory, - Collection listeners) { - this.taskFactory = taskFactory; - this.exprFactory = exprFactory; - this.resourceLoaderFactory = resourceLoaderFactory; - this.schemaValidatorFactory = schemaValidatorFactory; - this.positionFactory = positionFactory; - this.idFactory = idFactory; - this.runtimeDescriptorFactory = runtimeDescriptorFactory; - this.executorFactory = executorFactory; - this.listeners = listeners; + private WorkflowApplication(Builder builder) { + this.taskFactory = builder.taskFactory; + this.exprFactory = builder.exprFactory; + this.resourceLoaderFactory = builder.resourceLoaderFactory; + this.schemaValidatorFactory = builder.schemaValidatorFactory; + this.positionFactory = builder.positionFactory; + this.idFactory = builder.idFactory; + this.runtimeDescriptorFactory = builder.descriptorFactory; + this.executorFactory = builder.executorFactory; + this.listeners = builder.listeners != null ? builder.listeners : Collections.emptySet(); this.definitions = new ConcurrentHashMap<>(); + this.eventConsumer = builder.eventConsumer; + this.eventPublisher = builder.eventPublisher; } public TaskExecutorFactory taskFactory() { @@ -96,6 +94,10 @@ public Collection listeners() { return listeners; } + public EventPublisher eventPublisher() { + return eventPublisher; + } + public WorkflowIdFactory idFactory() { return idFactory; } @@ -109,6 +111,8 @@ public static class Builder { private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition(); private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString(); private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool(); + private EventConsumer eventConsumer = InMemoryEvents.get(); + private EventPublisher eventPublisher = InMemoryEvents.get(); private RuntimeDescriptorFactory descriptorFactory = () -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap()); @@ -162,19 +166,18 @@ public Builder withDescriptorFactory(RuntimeDescriptorFactory factory) { return this; } + public Builder withEventConsumer(EventConsumer eventConsumer) { + this.eventConsumer = eventConsumer; + return this; + } + + public Builder withEventPublisher(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } + public WorkflowApplication build() { - return new WorkflowApplication( - taskFactory, - exprFactory, - resourceLoaderFactory, - schemaValidatorFactory, - positionFactory, - idFactory, - descriptorFactory, - executorFactory, - listeners == null - ? Collections.emptySet() - : Collections.unmodifiableCollection(listeners)); + return new WorkflowApplication(this); } } @@ -205,6 +208,11 @@ public RuntimeDescriptorFactory runtimeDescriptorFactory() { return runtimeDescriptorFactory; } + @SuppressWarnings("rawtypes") + public EventConsumer eventConsumer() { + return eventConsumer; + } + public ExecutorService executorService() { synchronized (executorFactory) { if (executorService == null) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 0a174f69..6566cf6b 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -101,10 +101,6 @@ public Optional outputFilter() { return outputFilter; } - public WorkflowIdFactory idFactory() { - return application.idFactory(); - } - public Optional outputSchemaValidator() { return outputSchemaValidator; } @@ -113,6 +109,10 @@ public RuntimeDescriptorFactory runtimeDescriptorFactory() { return application.runtimeDescriptorFactory(); } + public WorkflowApplication application() { + return application; + } + @Override public void close() { // TODO close resourcers hold for uncompleted process instances, if any diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java index bbf84d39..2e55c484 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java @@ -36,7 +36,7 @@ public class WorkflowInstance { private CompletableFuture completableFuture; WorkflowInstance(WorkflowDefinition definition, JsonNode input) { - this.id = definition.idFactory().get(); + this.id = definition.application().idFactory().get(); this.input = input; this.definition = definition; this.status = new AtomicReference<>(WorkflowStatus.PENDING); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index 0866ba05..5feaf04e 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -24,6 +24,7 @@ import io.serverlessworkflow.api.types.SchemaExternal; import io.serverlessworkflow.api.types.SchemaInline; import io.serverlessworkflow.api.types.SchemaUnion; +import io.serverlessworkflow.api.types.UriTemplate; import io.serverlessworkflow.impl.expressions.Expression; import io.serverlessworkflow.impl.expressions.ExpressionFactory; import io.serverlessworkflow.impl.expressions.ExpressionUtils; @@ -35,8 +36,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; +import java.net.URI; import java.util.Map; import java.util.Optional; +import java.util.function.Function; public class WorkflowUtils { @@ -81,6 +84,25 @@ public static Optional buildWorkflowFilter( : Optional.empty(); } + public static ExpressionHolder buildExpressionHolder( + ExpressionFactory exprFactory, + String expression, + T literal, + Function converter) { + return expression != null + ? buildExpressionHolder(buildWorkflowFilter(exprFactory, expression), converter) + : buildExpressionHolder(literal); + } + + private static ExpressionHolder buildExpressionHolder( + WorkflowFilter filter, Function converter) { + return (w, t) -> converter.apply(filter.apply(w, t, t.input())); + } + + private static ExpressionHolder buildExpressionHolder(T literal) { + return (w, t) -> literal; + } + public static Optional buildWorkflowFilter( ExpressionFactory exprFactory, ExportAs as) { return as != null @@ -109,7 +131,7 @@ private static StringFilter toString(String literal) { return (w, t) -> literal; } - private static WorkflowFilter buildWorkflowFilter( + public static WorkflowFilter buildWorkflowFilter( ExpressionFactory exprFactory, String str, Object object) { if (str != null) { return buildWorkflowFilter(exprFactory, str); @@ -148,4 +170,9 @@ public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, public static Optional optionalFilter(ExpressionFactory exprFactory, String str) { return str != null ? Optional.of(buildWorkflowFilter(exprFactory, str)) : Optional.empty(); } + + public static String toString(UriTemplate template) { + URI uri = template.getLiteralUri(); + return uri != null ? uri.toString() : template.getLiteralUriTemplate(); + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java new file mode 100644 index 00000000..a3222342 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java @@ -0,0 +1,136 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.api.types.EventFilter; +import io.serverlessworkflow.api.types.EventProperties; +import io.serverlessworkflow.impl.WorkflowApplication; +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractTypeConsumer + implements EventConsumer { + + private static final Logger logger = LoggerFactory.getLogger(AbstractTypeConsumer.class); + + protected abstract void registerToAll(Consumer consumer); + + protected abstract void unregisterFromAll(); + + protected abstract void register(String topicName, Consumer consumer); + + protected abstract void unregister(String topicName); + + private Map registrations = new ConcurrentHashMap<>(); + + @Override + public TypeEventRegistrationBuilder listen( + EventFilter register, WorkflowApplication application) { + EventProperties properties = register.getWith(); + String type = properties.getType(); + return new TypeEventRegistrationBuilder( + type, new DefaultCloudEventPredicate(properties, application.expressionFactory())); + } + + @Override + public Collection listenToAll(WorkflowApplication application) { + return List.of(new TypeEventRegistrationBuilder(null, null)); + } + + private static class CloudEventConsumer extends AbstractCollection + implements Consumer { + private Collection registrations = new CopyOnWriteArrayList<>(); + + @Override + public void accept(CloudEvent ce) { + logger.debug("Received cloud event {}", ce); + for (TypeEventRegistration registration : registrations) { + if (registration.predicate().test(ce)) { + registration.consumer().accept(ce); + } + } + } + + @Override + public boolean add(TypeEventRegistration registration) { + return registrations.add(registration); + } + + @Override + public boolean remove(Object registration) { + return registrations.remove(registration); + } + + @Override + public Iterator iterator() { + return registrations.iterator(); + } + + @Override + public int size() { + return registrations.size(); + } + } + + public TypeEventRegistration register( + TypeEventRegistrationBuilder builder, Consumer ce) { + if (builder.type() == null) { + registerToAll(ce); + return new TypeEventRegistration(null, ce, null); + } else { + TypeEventRegistration registration = + new TypeEventRegistration(builder.type(), ce, builder.cePredicate()); + registrations + .computeIfAbsent( + registration.type(), + k -> { + CloudEventConsumer consumer = new CloudEventConsumer(); + register(k, consumer); + return consumer; + }) + .add(registration); + return registration; + } + } + + @Override + public void unregister(TypeEventRegistration registration) { + if (registration.type() == null) { + unregisterFromAll(); + } else { + registrations.computeIfPresent( + registration.type(), + (k, v) -> { + v.remove(registration); + if (v.isEmpty()) { + unregister(registration.type()); + return null; + } else { + return v; + } + }); + } + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventAttrPredicate.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventAttrPredicate.java new file mode 100644 index 00000000..6029d484 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventAttrPredicate.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +@FunctionalInterface +public interface CloudEventAttrPredicate { + boolean test(T value); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicate.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicate.java new file mode 100644 index 00000000..a790e371 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicate.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.cloudevents.CloudEvent; + +public interface CloudEventPredicate { + boolean test(CloudEvent event); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventUtils.java new file mode 100644 index 00000000..1b2709b8 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventUtils.java @@ -0,0 +1,101 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.jackson.JsonCloudEventData; +import io.serverlessworkflow.impl.json.JsonUtils; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.Map; + +public class CloudEventUtils { + + public static JsonNode toJsonNode(CloudEvent event) { + ObjectNode result = JsonUtils.mapper().createObjectNode(); + if (event.getData() != null) { + result.set("data", toJsonNode(event.getData())); + } + if (event.getSubject() != null) { + result.put("subject", event.getSubject()); + } + if (event.getDataContentType() != null) { + result.put("datacontenttype", event.getDataContentType()); + } + result.put("id", event.getId()); + result.put("source", event.getSource().toString()); + result.put("type", event.getType()); + result.put("specversion", event.getSpecVersion().toString()); + if (event.getDataSchema() != null) { + result.put("dataschema", event.getDataSchema().toString()); + } + if (event.getTime() != null) { + result.put("time", event.getTime().toString()); + } + event + .getExtensionNames() + .forEach(n -> result.set(n, JsonUtils.fromValue(event.getExtension(n)))); + return result; + } + + public static OffsetDateTime toOffset(Date date) { + return date.toInstant().atOffset(ZoneOffset.UTC); + } + + public static CloudEventBuilder addExtension( + CloudEventBuilder builder, String name, JsonNode value) { + if (value.isTextual()) { + builder.withExtension(name, value.asText()); + } else if (value.isBoolean()) { + builder.withExtension(name, value.isBoolean()); + } else if (value.isNumber()) { + builder.withExtension(name, value.numberValue()); + } + return builder; + } + + public static JsonNode toJsonNode(CloudEventData data) { + if (data == null) { + return NullNode.instance; + } + try { + return data instanceof JsonCloudEventData + ? ((JsonCloudEventData) data).getNode() + : JsonUtils.mapper().readTree(data.toBytes()); + } catch (IOException io) { + throw new UncheckedIOException(io); + } + } + + public static Map extensions(CloudEvent event) { + Map result = new LinkedHashMap<>(); + for (String name : event.getExtensionNames()) { + result.put(name, event.getExtension(name)); + } + return result; + } + + private CloudEventUtils() {} +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java new file mode 100644 index 00000000..6eb35995 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java @@ -0,0 +1,154 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import com.fasterxml.jackson.databind.JsonNode; +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.api.types.EventData; +import io.serverlessworkflow.api.types.EventDataschema; +import io.serverlessworkflow.api.types.EventProperties; +import io.serverlessworkflow.api.types.EventSource; +import io.serverlessworkflow.api.types.EventTime; +import io.serverlessworkflow.api.types.UriTemplate; +import io.serverlessworkflow.impl.WorkflowFilter; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.expressions.Expression; +import io.serverlessworkflow.impl.expressions.ExpressionFactory; +import io.serverlessworkflow.impl.json.JsonUtils; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.Map; +import java.util.Objects; + +public class DefaultCloudEventPredicate implements CloudEventPredicate { + + private final CloudEventAttrPredicate idFilter; + private final CloudEventAttrPredicate sourceFilter; + private final CloudEventAttrPredicate subjectFilter; + private final CloudEventAttrPredicate contentTypeFilter; + private final CloudEventAttrPredicate typeFilter; + private final CloudEventAttrPredicate dataSchemaFilter; + private final CloudEventAttrPredicate timeFilter; + private final CloudEventAttrPredicate dataFilter; + private final CloudEventAttrPredicate additionalFilter; + + private static final CloudEventAttrPredicate isTrue() { + return x -> true; + } + + public DefaultCloudEventPredicate(EventProperties properties, ExpressionFactory exprFactory) { + idFilter = stringFilter(properties.getId()); + subjectFilter = stringFilter(properties.getSubject()); + typeFilter = stringFilter(properties.getType()); + contentTypeFilter = stringFilter(properties.getDatacontenttype()); + sourceFilter = sourceFilter(properties.getSource(), exprFactory); + dataSchemaFilter = dataSchemaFilter(properties.getDataschema(), exprFactory); + timeFilter = offsetTimeFilter(properties.getTime(), exprFactory); + dataFilter = dataFilter(properties.getData(), exprFactory); + additionalFilter = additionalFilter(properties.getAdditionalProperties(), exprFactory); + } + + private CloudEventAttrPredicate additionalFilter( + Map additionalProperties, ExpressionFactory exprFactory) { + return additionalProperties != null && !additionalProperties.isEmpty() + ? from(WorkflowUtils.buildWorkflowFilter(exprFactory, null, additionalProperties)) + : isTrue(); + } + + private CloudEventAttrPredicate from(WorkflowFilter filter) { + return d -> filter.apply(null, null, d).asBoolean(); + } + + private CloudEventAttrPredicate dataFilter( + EventData data, ExpressionFactory exprFactory) { + return data != null + ? from( + WorkflowUtils.buildWorkflowFilter( + exprFactory, data.getRuntimeExpression(), data.getObject())) + : isTrue(); + } + + private CloudEventAttrPredicate offsetTimeFilter( + EventTime time, ExpressionFactory exprFactory) { + if (time != null) { + if (time.getRuntimeExpression() != null) { + final Expression expr = exprFactory.getExpression(time.getRuntimeExpression()); + return s -> evalExpr(expr, toString(s)); + } else if (time.getLiteralTime() != null) { + return s -> Objects.equals(s, CloudEventUtils.toOffset(time.getLiteralTime())); + } + } + return isTrue(); + } + + private CloudEventAttrPredicate dataSchemaFilter( + EventDataschema dataSchema, ExpressionFactory exprFactory) { + if (dataSchema != null) { + if (dataSchema.getExpressionDataSchema() != null) { + final Expression expr = exprFactory.getExpression(dataSchema.getExpressionDataSchema()); + return s -> evalExpr(expr, toString(s)); + } else if (dataSchema.getLiteralDataSchema() != null) { + return templateFilter(dataSchema.getLiteralDataSchema()); + } + } + return isTrue(); + } + + private CloudEventAttrPredicate stringFilter(String str) { + return str == null ? isTrue() : x -> x.equals(str); + } + + private CloudEventAttrPredicate sourceFilter( + EventSource source, ExpressionFactory exprFactory) { + if (source != null) { + if (source.getRuntimeExpression() != null) { + final Expression expr = exprFactory.getExpression(source.getRuntimeExpression()); + return s -> evalExpr(expr, toString(s)); + } else if (source.getUriTemplate() != null) { + return templateFilter(source.getUriTemplate()); + } + } + return isTrue(); + } + + private CloudEventAttrPredicate templateFilter(UriTemplate template) { + if (template.getLiteralUri() != null) { + return u -> Objects.equals(u, template.getLiteralUri()); + } + throw new UnsupportedOperationException("Template not supporte here yet"); + } + + private String toString(T uri) { + return uri != null ? uri.toString() : null; + } + + private boolean evalExpr(Expression expr, T value) { + return expr.eval(null, null, JsonUtils.fromValue(value)).asBoolean(); + } + + @Override + public boolean test(CloudEvent event) { + return idFilter.test(event.getId()) + && sourceFilter.test(event.getSource()) + && subjectFilter.test(event.getSubject()) + && contentTypeFilter.test(event.getDataContentType()) + && typeFilter.test(event.getType()) + && dataSchemaFilter.test(event.getDataSchema()) + && timeFilter.test(event.getTime()) + && dataFilter.test(CloudEventUtils.toJsonNode(event.getData())) + && additionalFilter.test(JsonUtils.fromValue(CloudEventUtils.extensions(event))); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java new file mode 100644 index 00000000..00c1619e --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.api.types.EventFilter; +import io.serverlessworkflow.impl.WorkflowApplication; +import java.util.Collection; +import java.util.function.Consumer; + +public interface EventConsumer { + + V listen(EventFilter filter, WorkflowApplication workflowApplication); + + Collection listenToAll(WorkflowApplication workflowApplication); + + T register(V builder, Consumer consumer); + + void unregister(T register); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventPublisher.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventPublisher.java new file mode 100644 index 00000000..08cc121d --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventPublisher.java @@ -0,0 +1,23 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.cloudevents.CloudEvent; +import java.util.concurrent.CompletableFuture; + +public interface EventPublisher { + CompletableFuture publish(CloudEvent event); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistration.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistration.java new file mode 100644 index 00000000..923647d5 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistration.java @@ -0,0 +1,18 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +public interface EventRegistration {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilder.java new file mode 100644 index 00000000..e81723ff --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilder.java @@ -0,0 +1,18 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +public interface EventRegistrationBuilder {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java new file mode 100644 index 00000000..3993f8fe --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java @@ -0,0 +1,77 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.cloudevents.CloudEvent; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +/* + * Straightforward implementation of in memory event broker. + * User might invoke notifyCE to simulate event reception. + */ +public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher { + + private static InMemoryEvents instance = new InMemoryEvents(); + + private InMemoryEvents() {} + + public static InMemoryEvents get() { + return instance; + } + + private Map> topicMap = new ConcurrentHashMap<>(); + + private AtomicReference> allConsumerRef = new AtomicReference<>(); + + @Override + protected void register(String topicName, Consumer consumer) { + topicMap.put(topicName, consumer); + } + + @Override + protected void unregister(String topicName) { + topicMap.remove(topicName); + } + + @Override + public CompletableFuture publish(CloudEvent ce) { + return CompletableFuture.runAsync( + () -> { + Consumer allConsumer = allConsumerRef.get(); + if (allConsumer != null) { + allConsumer.accept(ce); + } + Consumer consumer = topicMap.get(ce.getType()); + if (consumer != null) { + consumer.accept(ce); + } + }); + } + + @Override + protected void registerToAll(Consumer consumer) { + allConsumerRef.set(consumer); + } + + @Override + protected void unregisterFromAll() { + allConsumerRef.set(null); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java new file mode 100644 index 00000000..8fdf2388 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java @@ -0,0 +1,24 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.serverlessworkflow.impl.events; + +import io.cloudevents.CloudEvent; +import java.util.function.Consumer; + +public record TypeEventRegistration( + String type, Consumer consumer, CloudEventPredicate predicate) + implements EventRegistration {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistrationBuilder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistrationBuilder.java new file mode 100644 index 00000000..bd504a76 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistrationBuilder.java @@ -0,0 +1,20 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.serverlessworkflow.impl.events; + +public record TypeEventRegistrationBuilder(String type, CloudEventPredicate cePredicate) + implements EventRegistrationBuilder {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java index 1aac152c..0499fced 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java @@ -28,8 +28,10 @@ import io.serverlessworkflow.impl.WorkflowPosition; import io.serverlessworkflow.impl.executors.CallTaskExecutor.CallTaskExecutorBuilder; import io.serverlessworkflow.impl.executors.DoExecutor.DoExecutorBuilder; +import io.serverlessworkflow.impl.executors.EmitExecutor.EmitExecutorBuilder; import io.serverlessworkflow.impl.executors.ForExecutor.ForExecutorBuilder; import io.serverlessworkflow.impl.executors.ForkExecutor.ForkExecutorBuilder; +import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder; import io.serverlessworkflow.impl.executors.RaiseExecutor.RaiseExecutorBuilder; import io.serverlessworkflow.impl.executors.SetExecutor.SetExecutorBuilder; import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder; @@ -125,6 +127,12 @@ public TaskExecutorBuilder getTaskExecutor( } else if (task.getWaitTask() != null) { return new WaitExecutorBuilder( position, task.getWaitTask(), workflow, application, resourceLoader); + } else if (task.getListenTask() != null) { + return new ListenExecutorBuilder( + position, task.getListenTask(), workflow, application, resourceLoader); + } else if (task.getEmitTask() != null) { + return new EmitExecutorBuilder( + position, task.getEmitTask(), workflow, application, resourceLoader); } throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet"); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java new file mode 100644 index 00000000..7a8eb09d --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java @@ -0,0 +1,217 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import com.fasterxml.jackson.databind.JsonNode; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.jackson.JsonCloudEventData; +import io.serverlessworkflow.api.types.EmitTask; +import io.serverlessworkflow.api.types.EventData; +import io.serverlessworkflow.api.types.EventDataschema; +import io.serverlessworkflow.api.types.EventProperties; +import io.serverlessworkflow.api.types.EventSource; +import io.serverlessworkflow.api.types.EventTime; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.ExpressionHolder; +import io.serverlessworkflow.impl.StringFilter; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowFilter; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.events.CloudEventUtils; +import io.serverlessworkflow.impl.expressions.ExpressionFactory; +import io.serverlessworkflow.impl.json.JsonUtils; +import io.serverlessworkflow.impl.resources.ResourceLoader; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +public class EmitExecutor extends RegularTaskExecutor { + + private final EventPropertiesBuilder props; + + public static class EmitExecutorBuilder extends RegularTaskExecutorBuilder { + + private EventPropertiesBuilder eventBuilder; + + protected EmitExecutorBuilder( + WorkflowPosition position, + EmitTask task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + super(position, task, workflow, application, resourceLoader); + this.eventBuilder = + EventPropertiesBuilder.build( + task.getEmit().getEvent().getWith(), application.expressionFactory()); + } + + @Override + public TaskExecutor buildInstance() { + return new EmitExecutor(this); + } + } + + private EmitExecutor(EmitExecutorBuilder builder) { + super(builder); + this.props = builder.eventBuilder; + } + + @Override + protected CompletableFuture internalExecute( + WorkflowContext workflow, TaskContext taskContext) { + return workflow + .definition() + .application() + .eventPublisher() + .publish(buildCloudEvent(workflow, taskContext)) + .thenApply(v -> taskContext.input()); + } + + private CloudEvent buildCloudEvent(WorkflowContext workflow, TaskContext taskContext) { + io.cloudevents.core.v1.CloudEventBuilder ceBuilder = CloudEventBuilder.v1(); + ceBuilder.withId( + props + .idFilter() + .map(filter -> filter.apply(workflow, taskContext)) + .orElse(UUID.randomUUID().toString())); + ceBuilder.withSource( + props + .sourceFilter() + .map(filter -> filter.apply(workflow, taskContext)) + .map(URI::create) + .orElse(URI.create("reference-impl"))); + ceBuilder.withType( + props + .typeFilter() + .map(filter -> filter.apply(workflow, taskContext)) + .orElseThrow( + () -> new IllegalArgumentException("Type is required for emitting events"))); + props + .timeFilter() + .map(filter -> filter.apply(workflow, taskContext)) + .ifPresent(value -> ceBuilder.withTime(value)); + props + .subjectFilter() + .map(filter -> filter.apply(workflow, taskContext)) + .ifPresent(value -> ceBuilder.withSubject(value)); + props + .dataSchemaFilter() + .map(filter -> filter.apply(workflow, taskContext)) + .ifPresent(value -> ceBuilder.withDataSchema(URI.create(value))); + props + .contentTypeFilter() + .map(filter -> filter.apply(workflow, taskContext)) + .ifPresent(value -> ceBuilder.withDataContentType(value)); + props + .dataFilter() + .map(filter -> filter.apply(workflow, taskContext, taskContext.input())) + .ifPresent(value -> ceBuilder.withData(JsonCloudEventData.wrap(value))); + props + .additionalFilter() + .map(filter -> filter.apply(workflow, taskContext, taskContext.input())) + .ifPresent( + value -> + value + .fields() + .forEachRemaining( + e -> CloudEventUtils.addExtension(ceBuilder, e.getKey(), e.getValue()))); + return ceBuilder.build(); + } + + private static record EventPropertiesBuilder( + Optional idFilter, + Optional sourceFilter, + Optional subjectFilter, + Optional contentTypeFilter, + Optional typeFilter, + Optional dataSchemaFilter, + Optional> timeFilter, + Optional dataFilter, + Optional additionalFilter) { + + public static EventPropertiesBuilder build( + EventProperties properties, ExpressionFactory exprFactory) { + Optional idFilter = buildFilter(exprFactory, properties.getId()); + EventSource source = properties.getSource(); + Optional sourceFilter = + source == null + ? Optional.empty() + : Optional.of( + WorkflowUtils.buildStringFilter( + exprFactory, + source.getRuntimeExpression(), + WorkflowUtils.toString(source.getUriTemplate()))); + Optional subjectFilter = buildFilter(exprFactory, properties.getSubject()); + Optional contentTypeFilter = + buildFilter(exprFactory, properties.getDatacontenttype()); + Optional typeFilter = buildFilter(exprFactory, properties.getType()); + EventDataschema dataSchema = properties.getDataschema(); + Optional dataSchemaFilter = + dataSchema == null + ? Optional.empty() + : Optional.of( + WorkflowUtils.buildStringFilter( + exprFactory, + dataSchema.getExpressionDataSchema(), + WorkflowUtils.toString(dataSchema.getLiteralDataSchema()))); + EventTime time = properties.getTime(); + Optional> timeFilter = + time == null + ? Optional.empty() + : Optional.of( + WorkflowUtils.buildExpressionHolder( + exprFactory, + time.getRuntimeExpression(), + CloudEventUtils.toOffset(time.getLiteralTime()), + JsonUtils::toOffsetDateTime)); + EventData data = properties.getData(); + Optional dataFilter = + properties.getData() == null + ? Optional.empty() + : Optional.of( + WorkflowUtils.buildWorkflowFilter( + exprFactory, data.getRuntimeExpression(), data.getObject())); + Map ceAttrs = properties.getAdditionalProperties(); + Optional additionalFilter = + ceAttrs == null || ceAttrs.isEmpty() + ? Optional.empty() + : Optional.of(WorkflowUtils.buildWorkflowFilter(exprFactory, null, ceAttrs)); + return new EventPropertiesBuilder( + idFilter, + sourceFilter, + subjectFilter, + contentTypeFilter, + typeFilter, + dataSchemaFilter, + timeFilter, + dataFilter, + additionalFilter); + } + + private static Optional buildFilter(ExpressionFactory exprFactory, String str) { + return str == null + ? Optional.empty() + : Optional.of(WorkflowUtils.buildStringFilter(exprFactory, str)); + } + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java new file mode 100644 index 00000000..1a58d656 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java @@ -0,0 +1,315 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.api.types.AllEventConsumptionStrategy; +import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy; +import io.serverlessworkflow.api.types.EventConsumptionStrategy; +import io.serverlessworkflow.api.types.EventFilter; +import io.serverlessworkflow.api.types.ListenTask; +import io.serverlessworkflow.api.types.ListenTaskConfiguration; +import io.serverlessworkflow.api.types.ListenTaskConfiguration.ListenAndReadAs; +import io.serverlessworkflow.api.types.ListenTo; +import io.serverlessworkflow.api.types.OneEventConsumptionStrategy; +import io.serverlessworkflow.api.types.SubscriptionIterator; +import io.serverlessworkflow.api.types.Until; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowFilter; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.events.CloudEventUtils; +import io.serverlessworkflow.impl.events.EventConsumer; +import io.serverlessworkflow.impl.events.EventRegistration; +import io.serverlessworkflow.impl.events.EventRegistrationBuilder; +import io.serverlessworkflow.impl.json.JsonUtils; +import io.serverlessworkflow.impl.resources.ResourceLoader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +public abstract class ListenExecutor extends RegularTaskExecutor { + + protected final EventRegistrationBuilderCollection regBuilders; + protected final EventRegistrationBuilderCollection untilRegBuilders; + protected final Optional until; + protected final Optional> loop; + protected final Function converter; + protected final EventConsumer eventConsumer; + protected final AtomicBoolean untilEvent = new AtomicBoolean(true); + + private static record EventRegistrationBuilderCollection( + Collection registrations, boolean isAnd) {} + + public static class ListenExecutorBuilder extends RegularTaskExecutorBuilder { + + private EventRegistrationBuilderCollection registrations; + private WorkflowFilter until; + private EventRegistrationBuilderCollection untilRegistrations; + private TaskExecutor loop; + private Function converter = this::defaultCEConverter; + + private EventRegistrationBuilderCollection allEvents(AllEventConsumptionStrategy allStrategy) { + return new EventRegistrationBuilderCollection(from(allStrategy.getAll()), true); + } + + private EventRegistrationBuilderCollection anyEvents(AnyEventConsumptionStrategy anyStrategy) { + List eventFilters = anyStrategy.getAny(); + return new EventRegistrationBuilderCollection( + eventFilters.isEmpty() ? registerToAll() : from(eventFilters), false); + } + + private EventRegistrationBuilderCollection oneEvent(OneEventConsumptionStrategy oneStrategy) { + return new EventRegistrationBuilderCollection(List.of(from(oneStrategy.getOne())), false); + } + + protected ListenExecutorBuilder( + WorkflowPosition position, + ListenTask task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + super(position, task, workflow, application, resourceLoader); + ListenTaskConfiguration listen = task.getListen(); + ListenTo to = listen.getTo(); + if (to.getAllEventConsumptionStrategy() != null) { + registrations = allEvents(to.getAllEventConsumptionStrategy()); + } else if (to.getAnyEventConsumptionStrategy() != null) { + AnyEventConsumptionStrategy any = to.getAnyEventConsumptionStrategy(); + registrations = anyEvents(any); + Until untilDesc = any.getUntil(); + if (untilDesc != null) { + if (untilDesc.getAnyEventUntilCondition() != null) { + until = + WorkflowUtils.buildWorkflowFilter( + application.expressionFactory(), untilDesc.getAnyEventUntilCondition()); + } else if (untilDesc.getAnyEventUntilConsumed() != null) { + EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed(); + if (strategy.getAllEventConsumptionStrategy() != null) { + untilRegistrations = allEvents(strategy.getAllEventConsumptionStrategy()); + } else if (strategy.getAnyEventConsumptionStrategy() != null) { + untilRegistrations = anyEvents(strategy.getAnyEventConsumptionStrategy()); + } else if (strategy.getOneEventConsumptionStrategy() != null) { + untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy()); + } + } + } + } else if (to.getOneEventConsumptionStrategy() != null) { + registrations = oneEvent(to.getOneEventConsumptionStrategy()); + } + SubscriptionIterator forEach = task.getForeach(); + if (forEach != null) { + loop = + TaskExecutorHelper.createExecutorList( + position, forEach.getDo(), workflow, application, resourceLoader); + } + ListenAndReadAs readAs = listen.getRead(); + if (readAs != null) { + switch (readAs) { + case ENVELOPE: + converter = CloudEventUtils::toJsonNode; + default: + case DATA: + converter = this::defaultCEConverter; + break; + } + } + } + + private Collection registerToAll() { + return application.eventConsumer().listenToAll(application); + } + + private JsonNode defaultCEConverter(CloudEvent ce) { + return CloudEventUtils.toJsonNode(ce.getData()); + } + + private Collection from(List filters) { + return filters.stream().map(this::from).collect(Collectors.toList()); + } + + private EventRegistrationBuilder from(EventFilter filter) { + return application.eventConsumer().listen(filter, application); + } + + @Override + public TaskExecutor buildInstance() { + return registrations.isAnd() ? new AndListenExecutor(this) : new OrListenExecutor(this); + } + } + + public static class AndListenExecutor extends ListenExecutor { + + public AndListenExecutor(ListenExecutorBuilder builder) { + super(builder); + } + + protected void internalProcessCe( + JsonNode node, + ArrayNode arrayNode, + WorkflowContext workflow, + TaskContext taskContext, + CompletableFuture future) { + arrayNode.add(node); + future.complete(node); + } + + @Override + protected CompletableFuture combine(CompletableFuture[] completables) { + return CompletableFuture.allOf(completables); + } + } + + public static class OrListenExecutor extends ListenExecutor { + + public OrListenExecutor(ListenExecutorBuilder builder) { + super(builder); + } + + @Override + protected CompletableFuture combine(CompletableFuture[] completables) { + return CompletableFuture.anyOf(completables); + } + + protected void internalProcessCe( + JsonNode node, + ArrayNode arrayNode, + WorkflowContext workflow, + TaskContext taskContext, + CompletableFuture future) { + arrayNode.add(node); + if ((until.isEmpty() + || until + .filter(u -> u.apply(workflow, taskContext, arrayNode).asBoolean()) + .isPresent()) + && untilEvent.get()) { + future.complete(arrayNode); + } + } + } + + protected abstract CompletableFuture combine(CompletableFuture[] completables); + + protected abstract void internalProcessCe( + JsonNode node, + ArrayNode arrayNode, + WorkflowContext workflow, + TaskContext taskContext, + CompletableFuture future); + + @Override + protected CompletableFuture internalExecute( + WorkflowContext workflow, TaskContext taskContext) { + ArrayNode output = JsonUtils.mapper().createArrayNode(); + Collection registrations = new ArrayList<>(); + if (untilRegBuilders != null) { + untilEvent.set(false); + } + CompletableFuture combinedFuture = + combine( + toCompletables( + regBuilders, + registrations, + (ce, future) -> + processCe(converter.apply(ce), output, workflow, taskContext, future))); + CompletableFuture resultFuture = + combinedFuture.thenApply( + v -> { + registrations.forEach(reg -> eventConsumer.unregister(reg)); + return output; + }); + if (untilRegBuilders != null) { + Collection untilRegistrations = new ArrayList<>(); + CompletableFuture[] futures = + toCompletables( + untilRegBuilders, untilRegistrations, (ce, future) -> future.complete(null)); + CompletableFuture untilFuture = + untilRegBuilders.isAnd() + ? CompletableFuture.allOf(futures) + : CompletableFuture.anyOf(futures); + untilFuture.thenAccept( + v -> { + untilEvent.set(true); + combinedFuture.complete(null); + untilRegistrations.forEach(reg -> eventConsumer.unregister(reg)); + }); + } + return resultFuture; + } + + private CompletableFuture[] toCompletables( + EventRegistrationBuilderCollection regCollection, + Collection registrations, + BiConsumer> consumer) { + return regCollection.registrations().stream() + .map(reg -> toCompletable(reg, registrations, consumer)) + .toArray(size -> new CompletableFuture[size]); + } + + private CompletableFuture toCompletable( + EventRegistrationBuilder regBuilder, + Collection registrations, + BiConsumer> ceConsumer) { + final CompletableFuture future = new CompletableFuture<>(); + registrations.add( + eventConsumer.register(regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future))); + return future; + } + + private void processCe( + JsonNode node, + ArrayNode arrayNode, + WorkflowContext workflow, + TaskContext taskContext, + CompletableFuture future) { + loop.ifPresentOrElse( + t -> { + SubscriptionIterator forEach = task.getForeach(); + String item = forEach.getItem(); + if (item != null) { + taskContext.variables().put(item, node); + } + String at = forEach.getAt(); + if (at != null) { + taskContext.variables().put(at, arrayNode.size()); + } + TaskExecutorHelper.processTaskList(t, workflow, Optional.of(taskContext), node) + .thenAccept(n -> internalProcessCe(n, arrayNode, workflow, taskContext, future)); + }, + () -> internalProcessCe(node, arrayNode, workflow, taskContext, future)); + } + + protected ListenExecutor(ListenExecutorBuilder builder) { + super(builder); + this.eventConsumer = builder.application.eventConsumer(); + this.regBuilders = builder.registrations; + this.until = Optional.ofNullable(builder.until); + this.loop = Optional.ofNullable(builder.loop); + this.converter = builder.converter; + this.untilRegBuilders = builder.untilRegistrations; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java index 85598317..2f1ea1b6 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java @@ -27,6 +27,7 @@ import io.serverlessworkflow.impl.resources.ResourceLoader; import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; public class WaitExecutor extends RegularTaskExecutor { @@ -71,14 +72,7 @@ protected WaitExecutor(WaitExecutorBuilder builder) { @Override protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { - return CompletableFuture.supplyAsync( - () -> { - try { - Thread.sleep(millisToWait.toMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return taskContext.input(); - }); + return new CompletableFuture() + .completeOnTimeout(taskContext.output(), millisToWait.toMillis(), TimeUnit.MILLISECONDS); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java index 35ca13c7..6041515a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java @@ -83,11 +83,15 @@ private Scope createScope(WorkflowContext workflow, TaskContext task) { childScope.setValue("task", () -> JsonUtils.fromValue(TaskDescriptor.of(task))); task.variables().forEach((k, v) -> childScope.setValue(k, JsonUtils.fromValue(v))); } - childScope.setValue("context", workflow.context()); - childScope.setValue( - "runtime", - () -> JsonUtils.fromValue(workflow.definition().runtimeDescriptorFactory().get())); - childScope.setValue("workflow", () -> JsonUtils.fromValue(WorkflowDescriptor.of(workflow))); + if (workflow != null) { + childScope.setValue("context", workflow.context()); + childScope.setValue( + "runtime", + () -> + JsonUtils.fromValue( + workflow.definition().application().runtimeDescriptorFactory().get())); + childScope.setValue("workflow", () -> JsonUtils.fromValue(WorkflowDescriptor.of(workflow))); + } return childScope; } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/json/JsonUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/json/JsonUtils.java index 0726c2be..37d5c668 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/json/JsonUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/json/JsonUtils.java @@ -34,6 +34,9 @@ import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -87,6 +90,12 @@ public Supplier supplier() { }; } + public static OffsetDateTime toOffsetDateTime(JsonNode node) { + return node.isTextual() + ? OffsetDateTime.parse(node.asText()) + : OffsetDateTime.ofInstant(Instant.ofEpochMilli(node.asLong()), ZoneOffset.UTC); + } + /* * Implementation note: * Although we can use directly ObjectMapper.convertValue for implementing fromValue and toJavaValue methods, diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java b/impl/core/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java new file mode 100644 index 00000000..8dcba7ab --- /dev/null +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.impl.json.JsonUtils; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class EventDefinitionTest { + + private static WorkflowApplication appl; + + @BeforeAll + static void init() { + appl = WorkflowApplication.builder().build(); + } + + @ParameterizedTest + @MethodSource("eventListenerParameters") + void testEventListened(String listen, String emit, JsonNode expectedResult, Object emitInput) + throws IOException { + WorkflowDefinition listenDefinition = + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath(listen)); + WorkflowDefinition emitDefinition = + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath(emit)); + WorkflowInstance waitingInstance = listenDefinition.instance(Map.of()); + CompletableFuture future = waitingInstance.start(); + assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.RUNNING); + emitDefinition.instance(emitInput).start().join(); + assertThat(future).isCompleted(); + assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.COMPLETED); + assertThat(waitingInstance.outputAsJsonNode()).isEqualTo(expectedResult); + } + + @Test + void testUntilConsumed() throws IOException { + WorkflowDefinition listenDefinition = + appl.workflowDefinition( + WorkflowReader.readWorkflowFromClasspath("listen-to-any-until-consumed.yaml")); + WorkflowDefinition emitDoctorDefinition = + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("emit-doctor.yaml")); + WorkflowDefinition emitOutDefinition = + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("emit-out.yaml")); + WorkflowInstance waitingInstance = listenDefinition.instance(Map.of()); + CompletableFuture future = waitingInstance.start(); + assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.RUNNING); + emitDoctorDefinition.instance(Map.of("temperature", 35)).start().join(); + assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.RUNNING); + emitDoctorDefinition.instance(Map.of("temperature", 39)).start().join(); + assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.RUNNING); + emitOutDefinition.instance(Map.of()).start().join(); + assertThat(future).isCompleted(); + assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.COMPLETED); + } + + private static Stream eventListenerParameters() { + return Stream.of( + Arguments.of("listen-to-any.yaml", "emit.yaml", cruellaDeVil(), Map.of()), + Arguments.of( + "listen-to-any-filter.yaml", "emit-doctor.yaml", doctor(), Map.of("temperature", 39))); + } + + private static JsonNode cruellaDeVil() { + ObjectMapper mapper = JsonUtils.mapper(); + ObjectNode node = mapper.createObjectNode(); + node.set( + "client", mapper.createObjectNode().put("firstName", "Cruella").put("lastName", "de Vil")); + node.set( + "items", + mapper + .createArrayNode() + .add(mapper.createObjectNode().put("breed", "dalmatian").put("quantity", 101))); + return mapper.createArrayNode().add(node); + } + + private static JsonNode doctor() { + ObjectMapper mapper = JsonUtils.mapper(); + ObjectNode node = mapper.createObjectNode(); + node.put("temperature", 39); + node.put("isSick", true); + return mapper.createArrayNode().add(node); + } +} diff --git a/impl/core/src/test/resources/emit-doctor.yaml b/impl/core/src/test/resources/emit-doctor.yaml new file mode 100644 index 00000000..b940b9cd --- /dev/null +++ b/impl/core/src/test/resources/emit-doctor.yaml @@ -0,0 +1,14 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: emit-doctor + version: '0.1.0' +do: + - emitEvent: + emit: + event: + with: + source: https://hospital.com + type: com.fake-hospital.vitals.measurements.temperature + data: + temperature: ${.temperature} \ No newline at end of file diff --git a/impl/core/src/test/resources/emit-out.yaml b/impl/core/src/test/resources/emit-out.yaml new file mode 100644 index 00000000..41582f34 --- /dev/null +++ b/impl/core/src/test/resources/emit-out.yaml @@ -0,0 +1,12 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: emit-out + version: '0.1.0' +do: + - emitEvent: + emit: + event: + with: + source: https://hospital.com + type: com.fake-hospital.patient.checked-out \ No newline at end of file diff --git a/impl/core/src/test/resources/emit.yaml b/impl/core/src/test/resources/emit.yaml new file mode 100644 index 00000000..d4d6d559 --- /dev/null +++ b/impl/core/src/test/resources/emit.yaml @@ -0,0 +1,19 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: emit + version: '0.1.0' +do: + - emitEvent: + emit: + event: + with: + source: https://petstore.com + type: com.petstore.order.placed.v1 + data: + client: + firstName: Cruella + lastName: de Vil + items: + - breed: dalmatian + quantity: 101 \ No newline at end of file diff --git a/impl/core/src/test/resources/listen-to-any-filter.yaml b/impl/core/src/test/resources/listen-to-any-filter.yaml new file mode 100644 index 00000000..49185870 --- /dev/null +++ b/impl/core/src/test/resources/listen-to-any-filter.yaml @@ -0,0 +1,25 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: listen-to-any-filter + version: '0.1.0' +do: + - callDoctor: + listen: + to: + any: + - with: + type: com.fake-hospital.vitals.measurements.temperature + data: ${ .temperature > 38 } + - with: + type: com.fake-hospital.vitals.measurements.bpm + data: ${ .bpm < 60 or .bpm > 100 } + until: ( . | length ) > 0 + foreach: + item: event + do: + - isSick: + set: + temperature: ${$event.temperature} + isSick: true + \ No newline at end of file diff --git a/impl/core/src/test/resources/listen-to-any-until-consumed.yaml b/impl/core/src/test/resources/listen-to-any-until-consumed.yaml new file mode 100644 index 00000000..62f04d2d --- /dev/null +++ b/impl/core/src/test/resources/listen-to-any-until-consumed.yaml @@ -0,0 +1,20 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: listen-to-any-until-consumed + version: '0.1.0' +do: + - callDoctor: + listen: + to: + any: + - with: + type: com.fake-hospital.vitals.measurements.temperature + data: ${ .temperature > 38 } + - with: + type: com.fake-hospital.vitals.measurements.bpm + data: ${ .bpm < 60 or .bpm > 100 } + until: + one: + with: + type: com.fake-hospital.patient.checked-out \ No newline at end of file diff --git a/impl/core/src/test/resources/listen-to-any.yaml b/impl/core/src/test/resources/listen-to-any.yaml new file mode 100644 index 00000000..b4a9fcb9 --- /dev/null +++ b/impl/core/src/test/resources/listen-to-any.yaml @@ -0,0 +1,10 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: listen-to-any + version: '0.1.0' +do: + - callDoctor: + listen: + to: + any: [] \ No newline at end of file diff --git a/impl/pom.xml b/impl/pom.xml index 802d4fed..0f0a224d 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -9,6 +9,9 @@ pom 3.1.10 + 4.0.1 + 1.2.0 + 5.2.3 @@ -32,7 +35,27 @@ jersey-media-json-jackson ${version.org.glassfish.jersey} - + + io.cloudevents + cloudevents-api + ${version.io.cloudevents} + + + io.cloudevents + cloudevents-json-jackson + ${version.io.cloudevents} + + + net.thisptr + jackson-jq + ${version.net.thisptr} + + + com.github.f4b6a3 + ulid-creator + ${version.com.github.f4b6a3} + + http