Skip to content

Commit 72f03d8

Browse files
committed
[Fix #484] Removing SortedArrayList
Maybe for larger list that has to be kept sorte that one is useful, but in this case is faster to add everything unsorted (all insertions are really fast) and then sort (there is only one sort of a small amount of items) Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
1 parent 74fc958 commit 72f03d8

File tree

13 files changed

+201
-177
lines changed

13 files changed

+201
-177
lines changed

impl/core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,10 @@
5050
<artifactId>assertj-core</artifactId>
5151
<scope>test</scope>
5252
</dependency>
53+
<dependency>
54+
<groupId>ch.qos.logback</groupId>
55+
<artifactId>logback-classic</artifactId>
56+
<scope>test</scope>
57+
</dependency>
5358
</dependencies>
5459
</project>
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import java.util.function.BiFunction;
19+
20+
@FunctionalInterface
21+
public interface LongFilter extends BiFunction<WorkflowContext, TaskContext<?>, Long> {}

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,6 @@ public TaskContext(JsonNode input, WorkflowPosition position) {
4141
this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>());
4242
}
4343

44-
public TaskContext<T> copy() {
45-
return new TaskContext<T>(
46-
rawInput,
47-
task,
48-
position.copy(),
49-
startedAt,
50-
input,
51-
output,
52-
rawOutput,
53-
flowDirective,
54-
new HashMap<>(contextVariables));
55-
}
56-
5744
public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
5845
this(
5946
input,
@@ -88,6 +75,19 @@ private TaskContext(
8875
this.contextVariables = contextVariables;
8976
}
9077

78+
public TaskContext<T> copy() {
79+
return new TaskContext<T>(
80+
rawInput,
81+
task,
82+
position.copy(),
83+
startedAt,
84+
input,
85+
output,
86+
rawOutput,
87+
flowDirective,
88+
new HashMap<>(contextVariables));
89+
}
90+
9191
public void input(JsonNode input) {
9292
this.input = input;
9393
this.rawOutput = input;

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,18 +94,22 @@ public static Optional<WorkflowFilter> buildWorkflowFilter(
9494

9595
public static StringFilter buildStringFilter(
9696
ExpressionFactory exprFactory, String expression, String literal) {
97-
return expression != null ? from(buildWorkflowFilter(exprFactory, expression)) : from(literal);
97+
return expression != null
98+
? toString(buildWorkflowFilter(exprFactory, expression))
99+
: toString(literal);
98100
}
99101

100102
public static StringFilter buildStringFilter(ExpressionFactory exprFactory, String str) {
101-
return ExpressionUtils.isExpr(str) ? from(buildWorkflowFilter(exprFactory, str)) : from(str);
103+
return ExpressionUtils.isExpr(str)
104+
? toString(buildWorkflowFilter(exprFactory, str))
105+
: toString(str);
102106
}
103107

104-
public static StringFilter from(WorkflowFilter filter) {
108+
public static StringFilter toString(WorkflowFilter filter) {
105109
return (w, t) -> filter.apply(w, t, t.input()).asText();
106110
}
107111

108-
private static StringFilter from(String literal) {
112+
private static StringFilter toString(String literal) {
109113
return (w, t) -> literal;
110114
}
111115

@@ -124,6 +128,21 @@ private static WorkflowFilter buildWorkflowFilter(
124128
throw new IllegalStateException("Both object and str are null");
125129
}
126130

131+
public static LongFilter buildLongFilter(
132+
ExpressionFactory exprFactory, String expression, Long literal) {
133+
return expression != null
134+
? toLong(buildWorkflowFilter(exprFactory, expression))
135+
: toLong(literal);
136+
}
137+
138+
public static LongFilter toLong(WorkflowFilter filter) {
139+
return (w, t) -> filter.apply(w, t, t.input()).asLong();
140+
}
141+
142+
private static LongFilter toLong(Long literal) {
143+
return (w, t) -> literal;
144+
}
145+
127146
private static TaskItem findTaskByName(ListIterator<TaskItem> iter, String taskName) {
128147
int currentIndex = iter.nextIndex();
129148
while (iter.hasPrevious()) {

impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public TaskExecutor<? extends TaskBase> getTaskExecutor(
7373
return new TryExecutor(task.getTryTask(), definition);
7474
} else if (task.getForkTask() != null) {
7575
return new ForkExecutor(task.getForkTask(), definition);
76+
} else if (task.getWaitTask() != null) {
77+
return new WaitExecutor(task.getWaitTask(), definition);
7678
}
7779
throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet");
7880
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.impl.executors;
1717

18+
import com.fasterxml.jackson.databind.JsonNode;
1819
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
1920
import io.serverlessworkflow.api.types.ForkTask;
2021
import io.serverlessworkflow.api.types.ForkTaskConfiguration;
@@ -24,16 +25,16 @@
2425
import io.serverlessworkflow.impl.WorkflowDefinition;
2526
import io.serverlessworkflow.impl.WorkflowState;
2627
import io.serverlessworkflow.impl.WorkflowUtils;
27-
import io.serverlessworkflow.impl.generic.SortedArrayList;
2828
import io.serverlessworkflow.impl.json.JsonUtils;
2929
import java.lang.reflect.UndeclaredThrowableException;
30+
import java.util.ArrayList;
3031
import java.util.HashMap;
3132
import java.util.List;
3233
import java.util.Map;
3334
import java.util.concurrent.ExecutionException;
3435
import java.util.concurrent.ExecutorService;
3536
import java.util.concurrent.Future;
36-
import java.util.stream.Collectors;
37+
import java.util.stream.Stream;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
3940

@@ -47,8 +48,6 @@ protected ForkExecutor(ForkTask task, WorkflowDefinition definition) {
4748
service = definition.executorService();
4849
}
4950

50-
private record BranchContext(String taskName, TaskContext<?> taskContext) {}
51-
5251
@Override
5352
protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> taskContext) {
5453
ForkTaskConfiguration forkConfig = task.getFork();
@@ -62,13 +61,10 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> t
6261
item.getName(),
6362
service.submit(() -> executeBranch(workflow, taskContext.copy(), item, i)));
6463
}
65-
List<BranchContext> results =
66-
new SortedArrayList<>(
67-
(arg1, arg2) ->
68-
arg1.taskContext.completedAt().compareTo(arg2.taskContext.completedAt()));
64+
List<Map.Entry<String, TaskContext<?>>> results = new ArrayList<>();
6965
for (Map.Entry<String, Future<TaskContext<?>>> entry : futures.entrySet()) {
7066
try {
71-
results.add(new BranchContext(entry.getKey(), entry.getValue().get()));
67+
results.add(Map.entry(entry.getKey(), entry.getValue().get()));
7268
} catch (ExecutionException ex) {
7369
Throwable cause = ex.getCause();
7470
if (cause instanceof RuntimeException) {
@@ -77,24 +73,25 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> t
7773
throw new UndeclaredThrowableException(ex);
7874
}
7975
} catch (InterruptedException ex) {
80-
logger.warn(
81-
"Thred executing branch {} was interrupted, this branch will be ignored",
82-
entry.getKey(),
83-
ex);
76+
logger.warn("Branch {} was interrupted, no result will be recorded", entry.getKey(), ex);
8477
}
8578
}
8679
if (!results.isEmpty()) {
80+
Stream<Map.Entry<String, TaskContext<?>>> sortedStream =
81+
results.stream()
82+
.sorted(
83+
(arg1, arg2) ->
84+
arg1.getValue().completedAt().compareTo(arg2.getValue().completedAt()));
8785
taskContext.rawOutput(
8886
forkConfig.isCompete()
89-
? results.get(0).taskContext().output()
90-
: JsonUtils.fromValue(
91-
results.stream()
92-
.map(
93-
e ->
94-
JsonUtils.mapper()
95-
.createObjectNode()
96-
.set(e.taskName(), e.taskContext().output()))
97-
.collect(Collectors.toList())));
87+
? sortedStream.map(e -> e.getValue().output()).findFirst().orElseThrow()
88+
: sortedStream
89+
.<JsonNode>map(
90+
e ->
91+
JsonUtils.mapper()
92+
.createObjectNode()
93+
.set(e.getKey(), e.getValue().output()))
94+
.collect(JsonUtils.arrayNodeCollector()));
9895
}
9996
}
10097
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors;
17+
18+
import io.serverlessworkflow.api.types.DurationInline;
19+
import io.serverlessworkflow.api.types.WaitTask;
20+
import io.serverlessworkflow.impl.TaskContext;
21+
import io.serverlessworkflow.impl.WorkflowContext;
22+
import io.serverlessworkflow.impl.WorkflowDefinition;
23+
import java.time.Duration;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
public class WaitExecutor extends AbstractTaskExecutor<WaitTask> {
28+
29+
private static Logger logger = LoggerFactory.getLogger(WaitExecutor.class);
30+
private final Duration millisToWait;
31+
32+
protected WaitExecutor(WaitTask task, WorkflowDefinition definition) {
33+
super(task, definition);
34+
this.millisToWait =
35+
task.getWait().getDurationInline() != null
36+
? toLong(task.getWait().getDurationInline())
37+
: Duration.parse(task.getWait().getDurationExpression());
38+
}
39+
40+
private Duration toLong(DurationInline durationInline) {
41+
Duration duration = Duration.ofMillis(durationInline.getMilliseconds());
42+
duration.plus(Duration.ofSeconds(durationInline.getSeconds()));
43+
duration.plus(Duration.ofMinutes(durationInline.getMinutes()));
44+
duration.plus(Duration.ofHours(durationInline.getHours()));
45+
duration.plus(Duration.ofDays(durationInline.getDays()));
46+
return duration;
47+
}
48+
49+
@Override
50+
protected void internalExecute(WorkflowContext workflow, TaskContext<WaitTask> taskContext) {
51+
try {
52+
Thread.sleep(millisToWait.toMillis());
53+
} catch (InterruptedException e) {
54+
logger.warn("Waiting thread was interrupted", e);
55+
Thread.currentThread().interrupt();
56+
}
57+
}
58+
}

impl/core/src/main/java/io/serverlessworkflow/impl/generic/SortedArrayList.java

Lines changed: 0 additions & 67 deletions
This file was deleted.

0 commit comments

Comments
 (0)