Skip to content

Commit f127c03

Browse files
javier-aliagacicoyleartur-ciocanu
committed
chore: New task execution task id test (#1352)
* chore: New task execution task id test test how taskExecutionTaskId can be used for idempotency Signed-off-by: Javier Aliaga <[email protected]> * chore: Clean up not used files Signed-off-by: Javier Aliaga <[email protected]> * docs: Task execution keys Signed-off-by: Javier Aliaga <[email protected]> * test: Modify unit tests Signed-off-by: Javier Aliaga <[email protected]> * Remove new lines Signed-off-by: artur-ciocanu <[email protected]> --------- Signed-off-by: Javier Aliaga <[email protected]> Signed-off-by: artur-ciocanu <[email protected]> Co-authored-by: Cassie Coyle <[email protected]> Co-authored-by: artur-ciocanu <[email protected]> Signed-off-by: Javier Aliaga <[email protected]>
1 parent d8ae74f commit f127c03

File tree

11 files changed

+275
-38
lines changed

11 files changed

+275
-38
lines changed

daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ weight: 20000
66
description: How to get up and running with workflows using the Dapr Java SDK
77
---
88

9-
Lets create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will:
9+
Let's create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will:
1010

1111
- Execute the workflow instance using the [Java workflow worker](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java)
1212
- Utilize the Java workflow client and API calls to [start and terminate workflow instances](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java)
@@ -85,11 +85,10 @@ You're up and running! Both Dapr and your app logs will appear here.
8585
== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001.
8686
```
8787

88-
## Run the `DemoWorkflowClient
88+
## Run the `DemoWorkflowClient`
8989

9090
The `DemoWorkflowClient` starts instances of workflows that have been registered with Dapr.
9191

92-
9392
```java
9493
public class DemoWorkflowClient {
9594

@@ -246,4 +245,40 @@ Exiting DemoWorkflowClient.
246245

247246
## Next steps
248247
- [Learn more about Dapr workflow]({{% ref workflow-overview.md %}})
249-
- [Workflow API reference]({{% ref workflow_api.md %}})
248+
- [Workflow API reference]({{% ref workflow_api.md %}})
249+
250+
## Advanced features
251+
252+
### Task Execution Keys
253+
254+
Task execution keys are unique identifiers generated by the durabletask-java library. They are stored in the `WorkflowActivityContext` and can be used to track and manage the execution of workflow activities. They are particularly useful for:
255+
256+
1. **Idempotency**: Ensuring activities are not executed multiple times for the same task
257+
2. **State Management**: Tracking the state of activity execution
258+
3. **Error Handling**: Managing retries and failures in a controlled manner
259+
260+
Here's an example of how to use task execution keys in your workflow activities:
261+
262+
```java
263+
public class TaskExecutionKeyActivity implements WorkflowActivity {
264+
@Override
265+
public Object run(WorkflowActivityContext ctx) {
266+
// Get the task execution key for this activity
267+
String taskExecutionKey = ctx.getTaskExecutionKey();
268+
269+
// Use the key to implement idempotency or state management
270+
// For example, check if this task has already been executed
271+
if (isTaskAlreadyExecuted(taskExecutionKey)) {
272+
return getPreviousResult(taskExecutionKey);
273+
}
274+
275+
// Execute the activity logic
276+
Object result = executeActivityLogic();
277+
278+
// Store the result with the task execution key
279+
storeResult(taskExecutionKey, result);
280+
281+
return result;
282+
}
283+
}
284+
```

pom.xml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
<snakeyaml.version>2.0</snakeyaml.version>
5050
<testcontainers.version>1.21.3</testcontainers.version>
5151
<!-- Do NOT UPGRADE spring.version without checking springboot.version alignment -->
52-
<springboot.version>3.4.6</springboot.version>
52+
<springboot.version>3.4.9</springboot.version>
5353
<springframework.version>6.2.7</springframework.version>
5454
<!-- Do NOT UPGRADE springframework.version without checking springboot.version alignment -->
5555
<nexus-staging-maven-plugin.version>1.7.0</nexus-staging-maven-plugin.version>
@@ -65,6 +65,8 @@
6565
<commons-io.version>2.14.0</commons-io.version>
6666
<zipkin.version>3.4.0</zipkin.version>
6767
<microcks.version>0.3.1</microcks.version>
68+
<commons-compress.version>1.26.0</commons-compress.version>
69+
<commons-codec.version>1.17.0</commons-codec.version>
6870
</properties>
6971

7072
<distributionManagement>
@@ -372,6 +374,17 @@
372374
<artifactId>wiremock-standalone</artifactId>
373375
<version>${wiremock.version}</version>
374376
</dependency>
377+
<dependency>
378+
<groupId>org.apache.commons</groupId>
379+
<artifactId>commons-compress</artifactId>
380+
<version>${commons-compress.version}</version>
381+
</dependency>
382+
<dependency>
383+
<groupId>commons-codec</groupId>
384+
<artifactId>commons-codec</artifactId>
385+
<version>${commons-codec.version}</version>
386+
<scope>testf</scope>
387+
</dependency>
375388
</dependencies>
376389
</dependencyManagement>
377390

sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.fasterxml.jackson.core.JsonProcessingException;
1717
import com.fasterxml.jackson.databind.ObjectMapper;
18+
1819
import io.dapr.testcontainers.Component;
1920
import io.dapr.testcontainers.DaprContainer;
2021
import io.dapr.testcontainers.DaprLogLevel;
@@ -41,6 +42,7 @@
4142
import java.util.Map;
4243

4344
import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG;
45+
import static org.junit.Assert.assertTrue;
4446
import static org.junit.jupiter.api.Assertions.assertEquals;
4547
import static org.junit.jupiter.api.Assertions.assertNotNull;
4648

@@ -153,7 +155,7 @@ public void testNamedActivitiesWorkflows() throws Exception {
153155
String instanceId = workflowClient.scheduleNewWorkflow(TestNamedActivitiesWorkflow.class, payload);
154156

155157
workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false);
156-
158+
157159
Duration timeout = Duration.ofSeconds(10);
158160
WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true);
159161

@@ -171,6 +173,28 @@ public void testNamedActivitiesWorkflows() throws Exception {
171173
assertEquals(instanceId, workflowOutput.getWorkflowId());
172174
}
173175

176+
@Test
177+
public void testExecutionKeyWorkflows() throws Exception {
178+
TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>());
179+
String instanceId = workflowClient.scheduleNewWorkflow(TestExecutionKeysWorkflow.class, payload);
180+
181+
workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(100), false);
182+
183+
Duration timeout = Duration.ofSeconds(1000);
184+
WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true);
185+
186+
assertNotNull(workflowStatus);
187+
188+
TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput());
189+
190+
assertEquals(1, workflowOutput.getPayloads().size());
191+
assertEquals("Execution key found", workflowOutput.getPayloads().get(0));
192+
193+
assertTrue(KeyStore.getInstance().size() == 1);
194+
195+
assertEquals(instanceId, workflowOutput.getWorkflowId());
196+
}
197+
174198
private TestWorkflowPayload deserialize(String value) throws JsonProcessingException {
175199
return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class);
176200
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.dapr.it.testcontainers.workflows;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
18+
public class KeyStore {
19+
20+
private final Map<String, Boolean> keyStore = new HashMap<>();
21+
22+
private static KeyStore instance;
23+
24+
private KeyStore() {
25+
}
26+
27+
public static KeyStore getInstance() {
28+
if (instance == null) {
29+
synchronized (KeyStore.class) {
30+
if (instance == null) {
31+
instance = new KeyStore();
32+
}
33+
}
34+
}
35+
return instance;
36+
}
37+
38+
39+
public void addKey(String key, Boolean value) {
40+
keyStore.put(key, value);
41+
}
42+
43+
public Boolean getKey(String key) {
44+
return keyStore.get(key);
45+
}
46+
47+
public void removeKey(String key) {
48+
keyStore.remove(key);
49+
}
50+
51+
public int size() {
52+
return keyStore.size();
53+
}
54+
55+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.it.testcontainers.workflows;
15+
16+
import io.dapr.workflows.WorkflowActivity;
17+
import io.dapr.workflows.WorkflowActivityContext;
18+
19+
public class TaskExecutionIdActivity implements WorkflowActivity {
20+
21+
@Override
22+
public Object run(WorkflowActivityContext ctx) {
23+
TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class);
24+
KeyStore keyStore = KeyStore.getInstance();
25+
Boolean exists = keyStore.getKey(ctx.getTaskExecutionId());
26+
if (!Boolean.TRUE.equals(exists)) {
27+
keyStore.addKey(ctx.getTaskExecutionId(), true);
28+
workflowPayload.getPayloads().add("Execution key not found");
29+
throw new IllegalStateException("Task execution key not found");
30+
}
31+
workflowPayload.getPayloads().add("Execution key found");
32+
return workflowPayload;
33+
}
34+
35+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.it.testcontainers.workflows;
15+
16+
import io.dapr.durabletask.Task;
17+
import io.dapr.workflows.Workflow;
18+
import io.dapr.workflows.WorkflowStub;
19+
import io.dapr.workflows.WorkflowTaskOptions;
20+
import io.dapr.workflows.WorkflowTaskRetryPolicy;
21+
22+
import java.time.Duration;
23+
24+
import org.slf4j.Logger;
25+
26+
public class TestExecutionKeysWorkflow implements Workflow {
27+
28+
@Override
29+
public WorkflowStub create() {
30+
return ctx -> {
31+
32+
Logger logger = ctx.getLogger();
33+
String instanceId = ctx.getInstanceId();
34+
logger.info("Starting Workflow: " + ctx.getName());
35+
logger.info("Instance ID: " + instanceId);
36+
logger.info("Current Orchestration Time: " + ctx.getCurrentInstant());
37+
38+
TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class);
39+
workflowPayload.setWorkflowId(instanceId);
40+
41+
WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder()
42+
.setMaxNumberOfAttempts(3)
43+
.setFirstRetryInterval(Duration.ofSeconds(1))
44+
.setMaxRetryInterval(Duration.ofSeconds(10))
45+
.setBackoffCoefficient(2.0)
46+
.setRetryTimeout(Duration.ofSeconds(50))
47+
.build());
48+
49+
50+
Task<TestWorkflowPayload> t = ctx.callActivity(TaskExecutionIdActivity.class.getName(), workflowPayload, options,TestWorkflowPayload.class);
51+
52+
TestWorkflowPayload payloadAfterExecution = t.await();
53+
54+
ctx.complete(payloadAfterExecution);
55+
};
56+
}
57+
58+
}

sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -50,40 +50,41 @@ public WorkflowRuntimeBuilder workflowRuntimeBuilder(
5050
@Value("${dapr.http.endpoint}") String daprHttpEndpoint,
5151
@Value("${dapr.grpc.endpoint}") String daprGrpcEndpoint
5252
){
53-
Map<String, String> overrides = Map.of(
54-
"dapr.http.endpoint", daprHttpEndpoint,
55-
"dapr.grpc.endpoint", daprGrpcEndpoint
56-
);
57-
58-
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(new Properties(overrides));
53+
Map<String, String> overrides = Map.of(
54+
"dapr.http.endpoint", daprHttpEndpoint,
55+
"dapr.grpc.endpoint", daprGrpcEndpoint
56+
);
5957

60-
builder.registerWorkflow(TestWorkflow.class);
61-
builder.registerWorkflow(TestNamedActivitiesWorkflow.class);
58+
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(new Properties(overrides));
6259

63-
builder.registerActivity(FirstActivity.class);
64-
builder.registerActivity(SecondActivity.class);
65-
builder.registerActivity("a",FirstActivity.class);
66-
builder.registerActivity("b",FirstActivity.class);
67-
builder.registerActivity("c", new SecondActivity());
68-
builder.registerActivity("d", new WorkflowActivity() {
69-
@Override
70-
public Object run(WorkflowActivityContext ctx) {
71-
TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class);
72-
workflowPayload.getPayloads().add("Anonymous Activity");
73-
return workflowPayload;
74-
}
75-
});
76-
builder.registerActivity("e", new WorkflowActivity() {
77-
@Override
78-
public Object run(WorkflowActivityContext ctx) {
79-
TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class);
80-
workflowPayload.getPayloads().add("Anonymous Activity 2");
81-
return workflowPayload;
82-
}
83-
});
60+
builder.registerWorkflow(TestWorkflow.class);
61+
builder.registerWorkflow(TestExecutionKeysWorkflow.class);
62+
builder.registerWorkflow(TestNamedActivitiesWorkflow.class);
8463

64+
builder.registerActivity(FirstActivity.class);
65+
builder.registerActivity(SecondActivity.class);
66+
builder.registerActivity(TaskExecutionIdActivity.class);
8567

68+
builder.registerActivity("a", FirstActivity.class);
69+
builder.registerActivity("b", FirstActivity.class);
70+
builder.registerActivity("c", new SecondActivity());
71+
builder.registerActivity("d", new WorkflowActivity() {
72+
@Override
73+
public Object run(WorkflowActivityContext ctx) {
74+
TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class);
75+
workflowPayload.getPayloads().add("Anonymous Activity");
76+
return workflowPayload;
77+
}
78+
});
79+
builder.registerActivity("e", new WorkflowActivity() {
80+
@Override
81+
public Object run(WorkflowActivityContext ctx) {
82+
TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class);
83+
workflowPayload.getPayloads().add("Anonymous Activity 2");
84+
return workflowPayload;
85+
}
86+
});
8687

87-
return builder;
88+
return builder;
8889
}
8990
}

sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public interface WorkflowActivityContext {
1717

1818
String getName();
1919

20+
String getTaskExecutionId();
21+
2022
<T> T getInput(Class<T> targetType);
2123

2224
}

sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,9 @@ public String getName() {
5656
public <T> T getInput(Class<T> targetType) {
5757
return this.innerContext.getInput(targetType);
5858
}
59+
60+
@Override
61+
public String getTaskExecutionId() {
62+
return this.innerContext.getTaskExecutionId();
63+
}
5964
}

0 commit comments

Comments
 (0)