Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk-workflows/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<dependency>
<groupId>io.dapr</groupId>
<artifactId>durabletask-client</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</dependency>
<!--
manually declare durabletask-client's jackson dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,27 @@
public class WorkflowTaskOptions {

private final WorkflowTaskRetryPolicy retryPolicy;
private final WorkflowTaskRetryHandler retryHandler;

public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy) {
public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy, WorkflowTaskRetryHandler retryHandler) {
this.retryPolicy = retryPolicy;
this.retryHandler = retryHandler;
}

public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy) {
this(retryPolicy, null);
}

public WorkflowTaskOptions(WorkflowTaskRetryHandler retryHandler) {
this(null, retryHandler);
}

public WorkflowTaskRetryPolicy getRetryPolicy() {
return retryPolicy;
}

public WorkflowTaskRetryHandler getRetryHandler() {
return retryHandler;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.workflows;

import io.dapr.workflows.client.WorkflowFailureDetails;
import io.dapr.workflows.runtime.DefaultWorkflowContext;

import java.time.Duration;

public class WorkflowTaskRetryContext {

private final DefaultWorkflowContext workflowContext;
private final int lastAttemptNumber;
private final WorkflowFailureDetails lastFailure;
private final Duration totalRetryTime;

/**
* Constructor for WorkflowTaskRetryContext.
*
* @param workflowContext The workflow context
* @param lastAttemptNumber The number of the previous attempt
* @param lastFailure The failure details from the most recent failure
* @param totalRetryTime The amount of time spent retrying
*/
public WorkflowTaskRetryContext(
DefaultWorkflowContext workflowContext,
int lastAttemptNumber,
WorkflowFailureDetails lastFailure,
Duration totalRetryTime) {
this.workflowContext = workflowContext;
this.lastAttemptNumber = lastAttemptNumber;
this.lastFailure = lastFailure;
this.totalRetryTime = totalRetryTime;
}

/**
* Gets the context of the current workflow.
*
* <p>The workflow context can be used in retry handlers to schedule timers (via the
* {@link DefaultWorkflowContext#createTimer} methods) for implementing delays between retries. It can also be
* used to implement time-based retry logic by using the {@link DefaultWorkflowContext#getCurrentInstant} method.
*
* @return the context of the parent workflow
*/
public DefaultWorkflowContext getWorkflowContext() {
return this.workflowContext;

Check warning on line 57 in sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryContext.java

View check run for this annotation

Codecov / codecov/patch

sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryContext.java#L57

Added line #L57 was not covered by tests
}

/**
* Gets the details of the previous task failure, including the exception type, message, and callstack.
*
* @return the details of the previous task failure
*/
public WorkflowFailureDetails getLastFailure() {
return this.lastFailure;

Check warning on line 66 in sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryContext.java

View check run for this annotation

Codecov / codecov/patch

sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryContext.java#L66

Added line #L66 was not covered by tests
}

/**
* Gets the previous retry attempt number. This number starts at 1 and increments each time the retry handler
* is invoked for a particular task failure.
*
* @return the previous retry attempt number
*/
public int getLastAttemptNumber() {
return this.lastAttemptNumber;

Check warning on line 76 in sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryContext.java

View check run for this annotation

Codecov / codecov/patch

sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryContext.java#L76

Added line #L76 was not covered by tests
}

/**
* Gets the total amount of time spent in a retry loop for the current task.
*
* @return the total amount of time spent in a retry loop for the current task
*/
public Duration getTotalRetryTime() {
return this.totalRetryTime;

Check warning on line 85 in sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryContext.java

View check run for this annotation

Codecov / codecov/patch

sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryContext.java#L85

Added line #L85 was not covered by tests
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.workflows;

public interface WorkflowTaskRetryHandler {

/**
* Invokes retry handler logic. Return value indicates whether to continue retrying.
*
* @param retryContext The context of the retry
* @return {@code true} to continue retrying or {@code false} to stop retrying.
*/
boolean handle(WorkflowTaskRetryContext retryContext);

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,14 @@
*/
String getStackTrace();

/**
* Checks whether the failure was caused by the provided exception class.
*
* @param exceptionClass the exception class to check
* @return {@code true} if the failure was caused by the provided exception class
*/
default boolean isCausedBy(Class<? extends Exception> exceptionClass) {
throw new UnsupportedOperationException("This method is not implemented");

Check warning on line 49 in sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowFailureDetails.java

View check run for this annotation

Codecov / codecov/patch

sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowFailureDetails.java#L49

Added line #L49 was not covered by tests
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
package io.dapr.workflows.runtime;

import io.dapr.durabletask.CompositeTaskFailedException;
import io.dapr.durabletask.RetryHandler;
import io.dapr.durabletask.RetryPolicy;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
import io.dapr.durabletask.TaskOptions;
import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryContext;
import io.dapr.workflows.WorkflowTaskRetryHandler;
import io.dapr.workflows.WorkflowTaskRetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -228,22 +231,61 @@ public UUID newUuid() {
return this.innerContext.newUUID();
}

private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {
private TaskOptions toTaskOptions(WorkflowTaskOptions options) {
if (options == null) {
return null;
}

WorkflowTaskRetryPolicy workflowTaskRetryPolicy = options.getRetryPolicy();
RetryPolicy retryPolicy = toRetryPolicy(options.getRetryPolicy());
RetryHandler retryHandler = toRetryHandler(options.getRetryHandler());

return new TaskOptions(retryPolicy, retryHandler);
}

/**
* Converts a {@link WorkflowTaskRetryPolicy} to a {@link RetryPolicy}.
*
* @param workflowTaskRetryPolicy The {@link WorkflowTaskRetryPolicy} being converted
* @return A {@link RetryPolicy}
*/
private RetryPolicy toRetryPolicy(WorkflowTaskRetryPolicy workflowTaskRetryPolicy) {
if (workflowTaskRetryPolicy == null) {
return null;
}

RetryPolicy retryPolicy = new RetryPolicy(
workflowTaskRetryPolicy.getMaxNumberOfAttempts(),
workflowTaskRetryPolicy.getFirstRetryInterval()
workflowTaskRetryPolicy.getMaxNumberOfAttempts(),
workflowTaskRetryPolicy.getFirstRetryInterval()
);

retryPolicy.setBackoffCoefficient(workflowTaskRetryPolicy.getBackoffCoefficient());
if (workflowTaskRetryPolicy.getRetryTimeout() != null) {
retryPolicy.setRetryTimeout(workflowTaskRetryPolicy.getRetryTimeout());
}

return new TaskOptions(retryPolicy);
return retryPolicy;
}

/**
* Converts a {@link WorkflowTaskRetryHandler} to a {@link RetryHandler}.
*
* @param workflowTaskRetryHandler The {@link WorkflowTaskRetryHandler} being converted
* @return A {@link RetryHandler}
*/
private RetryHandler toRetryHandler(WorkflowTaskRetryHandler workflowTaskRetryHandler) {
if (workflowTaskRetryHandler == null) {
return null;
}

return retryContext -> {
WorkflowTaskRetryContext workflowRetryContext = new WorkflowTaskRetryContext(
this,
retryContext.getLastAttemptNumber(),
new DefaultWorkflowFailureDetails(retryContext.getLastFailure()),
retryContext.getTotalRetryTime()
);

return workflowTaskRetryHandler.handle(workflowRetryContext);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@
return workflowFailureDetails.getStackTrace();
}

/**
* Checks whether the failure was caused by the provided exception class.
*
* @param exceptionClass the exception class to check
* @return {@code true} if the failure was caused by the provided exception class
*/
@Override
public boolean isCausedBy(Class<? extends Exception> exceptionClass) {
return workflowFailureDetails.isCausedBy(exceptionClass);

Check warning on line 73 in sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowFailureDetails.java

View check run for this annotation

Codecov / codecov/patch

sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowFailureDetails.java#L73

Added line #L73 was not covered by tests
}

@Override
public String toString() {
return workflowFailureDetails.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package io.dapr.workflows;

import io.dapr.durabletask.CompositeTaskFailedException;
import io.dapr.durabletask.FailureDetails;
import io.dapr.durabletask.RetryContext;
import io.dapr.durabletask.RetryHandler;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
import io.dapr.durabletask.TaskOptions;
Expand All @@ -35,10 +38,11 @@
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -278,7 +282,7 @@ public void callChildWorkflowWithName() {
}

@Test
public void callChildWorkflowWithOptions() {
public void callChildWorkflowWithRetryPolicy() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
String expectedInstanceId = "TestInstanceId";
Expand All @@ -305,6 +309,90 @@ public void callChildWorkflowWithOptions() {
assertEquals(retryPolicy.getMaxNumberOfAttempts(), taskOptions.getRetryPolicy().getMaxNumberOfAttempts());
assertEquals(retryPolicy.getFirstRetryInterval(), taskOptions.getRetryPolicy().getFirstRetryInterval());
assertEquals(Duration.ZERO, taskOptions.getRetryPolicy().getRetryTimeout());
assertNull(taskOptions.getRetryHandler());
}

@Test
public void callChildWorkflowWithRetryHandler() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
String expectedInstanceId = "TestInstanceId";

WorkflowTaskRetryHandler retryHandler = spy(new WorkflowTaskRetryHandler() {
@Override
public boolean handle(WorkflowTaskRetryContext retryContext) {
return true;
}
});

WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(retryHandler);
ArgumentCaptor<TaskOptions> captor = ArgumentCaptor.forClass(TaskOptions.class);

context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, executionOptions, String.class);

verify(mockInnerContext, times(1))
.callSubOrchestrator(
eq(expectedName),
eq(expectedInput),
eq(expectedInstanceId),
captor.capture(),
eq(String.class)
);

TaskOptions taskOptions = captor.getValue();

RetryHandler durableRetryHandler = taskOptions.getRetryHandler();
RetryContext retryContext = mock(RetryContext.class, invocationOnMock -> null);

durableRetryHandler.handle(retryContext);

verify(retryHandler, times(1)).handle(any());
assertNull(taskOptions.getRetryPolicy());
}

@Test
public void callChildWorkflowWithRetryPolicyAndHandler() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
String expectedInstanceId = "TestInstanceId";

WorkflowTaskRetryPolicy retryPolicy = WorkflowTaskRetryPolicy.newBuilder()
.setMaxNumberOfAttempts(1)
.setFirstRetryInterval(Duration.ofSeconds(10))
.build();

WorkflowTaskRetryHandler retryHandler = spy(new WorkflowTaskRetryHandler() {
@Override
public boolean handle(WorkflowTaskRetryContext retryContext) {
return true;
}
});

WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(retryPolicy, retryHandler);
ArgumentCaptor<TaskOptions> captor = ArgumentCaptor.forClass(TaskOptions.class);

context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, executionOptions, String.class);

verify(mockInnerContext, times(1))
.callSubOrchestrator(
eq(expectedName),
eq(expectedInput),
eq(expectedInstanceId),
captor.capture(),
eq(String.class)
);

TaskOptions taskOptions = captor.getValue();

RetryHandler durableRetryHandler = taskOptions.getRetryHandler();
RetryContext retryContext = mock(RetryContext.class, invocationOnMock -> null);

durableRetryHandler.handle(retryContext);

verify(retryHandler, times(1)).handle(any());
assertEquals(retryPolicy.getMaxNumberOfAttempts(), taskOptions.getRetryPolicy().getMaxNumberOfAttempts());
assertEquals(retryPolicy.getFirstRetryInterval(), taskOptions.getRetryPolicy().getFirstRetryInterval());
assertEquals(Duration.ZERO, taskOptions.getRetryPolicy().getRetryTimeout());
}

@Test
Expand Down
Loading