16
16
package io.serverlessworkflow.impl;
17
17
18
18
import static io.serverlessworkflow.impl.WorkflowUtils.*;
19
- import static io.serverlessworkflow.impl.json.JsonUtils.*;
20
19
21
- import com.fasterxml.jackson.databind.JsonNode;
22
20
import io.serverlessworkflow.api.types.Input;
23
21
import io.serverlessworkflow.api.types.Output;
24
22
import io.serverlessworkflow.api.types.TaskBase;
25
- import io.serverlessworkflow.api.types.TaskItem;
26
23
import io.serverlessworkflow.api.types.Workflow;
27
24
import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory;
28
25
import io.serverlessworkflow.impl.executors.TaskExecutor;
34
31
import io.serverlessworkflow.impl.jsonschema.SchemaValidator;
35
32
import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory;
36
33
import io.serverlessworkflow.resources.DefaultResourceLoaderFactory;
34
+ import io.serverlessworkflow.resources.ResourceLoader;
37
35
import io.serverlessworkflow.resources.ResourceLoaderFactory;
38
36
import java.nio.file.Path;
39
37
import java.util.Collection;
40
38
import java.util.Collections;
41
39
import java.util.HashSet;
42
- import java.util.List;
43
40
import java.util.Map;
44
41
import java.util.Optional;
45
42
import java.util.concurrent.ConcurrentHashMap;
46
43
47
44
public class WorkflowDefinition {
48
45
46
+ private final Workflow workflow;
47
+ private final Collection<WorkflowExecutionListener> listeners;
48
+ private Optional<SchemaValidator> inputSchemaValidator = Optional.empty();
49
+ private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
50
+ private Optional<WorkflowFilter> inputFilter = Optional.empty();
51
+ private Optional<WorkflowFilter> outputFilter = Optional.empty();
52
+ private final TaskExecutorFactory taskFactory;
53
+ private final ExpressionFactory exprFactory;
54
+ private final ResourceLoader resourceLoader;
55
+ private final SchemaValidatorFactory schemaValidatorFactory;
56
+ private final Map<String, TaskExecutor<? extends TaskBase>> taskExecutors =
57
+ new ConcurrentHashMap<>();
58
+
49
59
private WorkflowDefinition(
50
60
Workflow workflow,
51
61
Collection<WorkflowExecutionListener> listeners,
52
- WorkflowFactories factories) {
62
+ TaskExecutorFactory taskFactory,
63
+ ResourceLoader resourceLoader,
64
+ ExpressionFactory exprFactory,
65
+ SchemaValidatorFactory schemaValidatorFactory) {
53
66
this.workflow = workflow;
54
67
this.listeners = listeners;
55
- this.factories = factories;
68
+ this.taskFactory = taskFactory;
69
+ this.exprFactory = exprFactory;
70
+ this.schemaValidatorFactory = schemaValidatorFactory;
71
+ this.resourceLoader = resourceLoader;
56
72
if (workflow.getInput() != null) {
57
73
Input input = workflow.getInput();
58
74
this.inputSchemaValidator =
59
75
getSchemaValidator(
60
- factories.getValidatorFactory() , schemaToNode(factories , input.getSchema()));
61
- this.inputFilter = buildWorkflowFilter(factories.getExpressionFactory() , input.getFrom());
76
+ schemaValidatorFactory , schemaToNode(resourceLoader , input.getSchema()));
77
+ this.inputFilter = buildWorkflowFilter(exprFactory , input.getFrom());
62
78
}
63
79
if (workflow.getOutput() != null) {
64
80
Output output = workflow.getOutput();
65
81
this.outputSchemaValidator =
66
82
getSchemaValidator(
67
- factories.getValidatorFactory() , schemaToNode(factories , output.getSchema()));
68
- this.outputFilter = buildWorkflowFilter(factories.getExpressionFactory() , output.getAs());
83
+ schemaValidatorFactory , schemaToNode(resourceLoader , output.getSchema()));
84
+ this.outputFilter = buildWorkflowFilter(exprFactory , output.getAs());
69
85
}
70
86
}
71
87
72
- private final Workflow workflow;
73
- private final Collection<WorkflowExecutionListener> listeners;
74
- private final WorkflowFactories factories;
75
- private Optional<SchemaValidator> inputSchemaValidator = Optional.empty();
76
- private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
77
- private Optional<WorkflowFilter> inputFilter = Optional.empty();
78
- private Optional<WorkflowFilter> outputFilter = Optional.empty();
79
-
80
- private final Map<String, TaskExecutor<? extends TaskBase>> taskExecutors =
81
- new ConcurrentHashMap<>();
82
-
83
88
public static class Builder {
84
89
private final Workflow workflow;
85
90
private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory.get();
@@ -127,18 +132,15 @@ public Builder withSchemaValidatorFactory(SchemaValidatorFactory factory) {
127
132
}
128
133
129
134
public WorkflowDefinition build() {
130
- WorkflowDefinition def =
131
- new WorkflowDefinition(
132
- workflow,
133
- listeners == null
134
- ? Collections.emptySet()
135
- : Collections.unmodifiableCollection(listeners),
136
- new WorkflowFactories(
137
- taskFactory,
138
- resourceLoaderFactory.getResourceLoader(path),
139
- exprFactory,
140
- schemaValidatorFactory));
141
- return def;
135
+ return new WorkflowDefinition(
136
+ workflow,
137
+ listeners == null
138
+ ? Collections.emptySet()
139
+ : Collections.unmodifiableCollection(listeners),
140
+ taskFactory,
141
+ resourceLoaderFactory.getResourceLoader(path),
142
+ exprFactory,
143
+ schemaValidatorFactory);
142
144
}
143
145
}
144
146
@@ -147,7 +149,7 @@ public static Builder builder(Workflow workflow) {
147
149
}
148
150
149
151
public WorkflowInstance execute(Object input) {
150
- return new WorkflowInstance(JsonUtils.fromValue(input));
152
+ return new WorkflowInstance(this, JsonUtils.fromValue(input));
151
153
}
152
154
153
155
enum State {
@@ -156,50 +158,48 @@ enum State {
156
158
FINISHED
157
159
};
158
160
159
- public class WorkflowInstance {
160
-
161
- private JsonNode output;
162
- private State state;
163
- private WorkflowContext context;
164
-
165
- private WorkflowInstance(JsonNode input) {
166
- this.output = input;
167
- inputSchemaValidator.ifPresent(v -> v.validate(input));
168
- this.context = WorkflowContext.builder(input).build();
169
- inputFilter.ifPresent(f -> output = f.apply(context, Optional.empty(), output));
170
- this.state = State.STARTED;
171
- processDo(workflow.getDo());
172
- outputFilter.ifPresent(f -> output = f.apply(context, Optional.empty(), output));
173
- outputSchemaValidator.ifPresent(v -> v.validate(output));
174
- }
161
+ public Optional<SchemaValidator> inputSchemaValidator() {
162
+ return inputSchemaValidator;
163
+ }
175
164
176
- private void processDo(List<TaskItem> tasks) {
177
- context.position().addProperty("do");
178
- int index = 0;
179
- for (TaskItem task : tasks) {
180
- context.position().addIndex(++index).addProperty(task.getName());
181
- listeners.forEach(l -> l.onTaskStarted(context.position(), task.getTask()));
182
- this.output =
183
- taskExecutors
184
- .computeIfAbsent(
185
- context.position().jsonPointer(),
186
- k -> factories.getTaskFactory().getTaskExecutor(task.getTask(), factories))
187
- .apply(context, output);
188
- listeners.forEach(l -> l.onTaskEnded(context.position(), task.getTask()));
189
- context.position().back().back();
190
- }
191
- }
165
+ public Optional<WorkflowFilter> inputFilter() {
166
+ return inputFilter;
167
+ }
192
168
193
- public State state () {
194
- return state ;
195
- }
169
+ public Workflow workflow () {
170
+ return workflow ;
171
+ }
196
172
197
- public Object output () {
198
- return toJavaValue(output) ;
199
- }
173
+ public Collection<WorkflowExecutionListener> listeners () {
174
+ return listeners ;
175
+ }
200
176
201
- public Object outputAsJsonNode() {
202
- return output;
203
- }
177
+ public Map<String, TaskExecutor<? extends TaskBase>> taskExecutors() {
178
+ return taskExecutors;
179
+ }
180
+
181
+ public TaskExecutorFactory taskFactory() {
182
+ return taskFactory;
183
+ }
184
+
185
+ public Optional<WorkflowFilter> outputFilter() {
186
+ return outputFilter;
187
+ }
188
+
189
+ public Optional<SchemaValidator> outputSchemaValidator() {
190
+ return outputSchemaValidator;
191
+ }
192
+
193
+ public ExpressionFactory expressionFactory() {
194
+ return exprFactory;
195
+ }
196
+
197
+ public SchemaValidatorFactory validatorFactory() {
198
+ return schemaValidatorFactory;
199
+ }
200
+
201
+ public ResourceLoader resourceLoader() {
202
+
203
+ return resourceLoader;
204
204
}
205
205
}
0 commit comments