From 564a71f302a7c5abcf2ca8948918b7d33e586363 Mon Sep 17 00:00:00 2001 From: Kirstyn Joy Amperiadis Date: Tue, 21 Mar 2023 09:55:50 -0500 Subject: [PATCH 1/5] Support restartInstance and add restartPostUri in HttpManagementPayload --- .../azurefunctions/HttpManagementPayload.java | 33 +++++++++++++ .../durabletask/DurableTaskClient.java | 2 + .../durabletask/DurableTaskGrpcClient.java | 18 +++++++ .../durabletask/IntegrationTests.java | 49 +++++++++++++++++++ 4 files changed, 102 insertions(+) diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java index bfd15cf0..6c937b9e 100644 --- a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java +++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java @@ -12,8 +12,11 @@ public class HttpManagementPayload { private final String id; private final String purgeHistoryDeleteUri; + private final String restartPostUri; + private final String resumePostUri; private final String sendEventPostUri; private final String statusQueryGetUri; + private final String suspendPostUri; private final String terminatePostUri; /** @@ -29,8 +32,11 @@ public HttpManagementPayload( String requiredQueryStringParameters) { this.id = instanceId; this.purgeHistoryDeleteUri = instanceStatusURL + "?" + requiredQueryStringParameters; + this.restartPostUri = instanceStatusURL + "/restart?" + requiredQueryStringParameters; + this.resumePostUri = instanceStatusURL + "/resume?reason={text}&" + requiredQueryStringParameters; this.sendEventPostUri = instanceStatusURL + "/raiseEvent/{eventName}?" + requiredQueryStringParameters; this.statusQueryGetUri = instanceStatusURL + "?" + requiredQueryStringParameters; + this.suspendPostUri = instanceStatusURL + "/suspend?reason={text}&" + requiredQueryStringParameters; this.terminatePostUri = instanceStatusURL + "/terminate?reason={text}&" + requiredQueryStringParameters; } @@ -78,4 +84,31 @@ public String getTerminatePostUri() { public String getPurgeHistoryDeleteUri() { return this.purgeHistoryDeleteUri; } + + /** + * Gets the HTTP POST instance restart endpoint. + * + * @return The HTTP URL for posting instance restart commands. + */ + public String getRestartPostUri() { + return restartPostUri; + } + + /** + * Gets the HTTP POST instance resume endpoint. + * + * @return The HTTP URL for posting instance resume commands. + */ + public String getResumePostUri() { + return resumePostUri; + } + + /** + * Gets the HTTP POST instance suspend endpoint. + * + * @return The HTTP URL for posting instance suspend commands. + */ + public String getSuspendPostUri() { + return suspendPostUri; + } } diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index 050ef4f7..25b0f7ac 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -311,4 +311,6 @@ public void resumeInstance(String instanceId) { * @param reason the reason for resuming the orchestration instance */ public abstract void resumeInstance(String instanceId, @Nullable String reason); + + public abstract String restartInstance(String instanceId, boolean restartWithNewInstanceId); } \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index e078aade..95bb984a 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -303,6 +303,24 @@ public void resumeInstance(String instanceId, @Nullable String reason) { this.sidecarClient.resumeInstance(resumeRequestBuilder.build()); } + @Override + public String restartInstance(String instanceId, boolean restartWithNewInstanceId) { + OrchestrationMetadata metadata = this.getInstanceMetadata(instanceId, true); + if (!metadata.isInstanceFound()) { + throw new IllegalArgumentException(new StringBuilder() + .append("An orchestration with instanceId ") + .append(instanceId) + .append(" was not found.").toString()); + } + + if (restartWithNewInstanceId) { + return this.scheduleNewOrchestrationInstance(metadata.getName(), this.dataConverter.deserialize(metadata.getSerializedInput(), Object.class)); + } + else { + return this.scheduleNewOrchestrationInstance(metadata.getName(), this.dataConverter.deserialize(metadata.getSerializedInput(), Object.class), metadata.getInstanceId()); + } + } + private PurgeResult toPurgeResult(PurgeInstancesResponse response){ return new PurgeResult(response.getDeletedInstanceCount()); } diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index af230169..85a89d70 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -408,6 +408,55 @@ void termination() throws TimeoutException { } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void restartOrchestrationWithNewInstanceId(boolean restartWithNewInstanceId) throws TimeoutException { + final String orchestratorName = "restart"; + final Duration delay = Duration.ofSeconds(3); + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> ctx.createTimer(delay).await()) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, "RestartTest"); + client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + String newInstanceId = client.restartInstance(instanceId, restartWithNewInstanceId); + OrchestrationMetadata instance = client.waitForInstanceCompletion(newInstanceId, defaultTimeout, true); + + if (restartWithNewInstanceId) { + assertNotEquals(instanceId, newInstanceId); + } else { + assertEquals(instanceId, newInstanceId); + } + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals("\"RestartTest\"", instance.getSerializedInput()); + } + } + + @Test + void restartOrchestrationThrowsException() { + final String orchestratorName = "restart"; + final Duration delay = Duration.ofSeconds(3); + final String nonExistentId = "123"; + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> ctx.createTimer(delay).await()) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, "RestartTest"); + + assertThrows( + IllegalArgumentException.class, + () -> client.restartInstance(nonExistentId, true) + ); + } + + } + @Test void suspendResumeOrchestration() throws TimeoutException, InterruptedException { final String orchestratorName = "suspend"; From 79e85292e07bdb1f4bca67bc4d5d1c408fe4fefa Mon Sep 17 00:00:00 2001 From: Kirstyn Joy Amperiadis Date: Tue, 21 Mar 2023 10:01:39 -0500 Subject: [PATCH 2/5] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 367b2b1a..d37abb29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Fix the potential NPE issue of `DurableTaskClient terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104)) * Add waitForCompletionOrCreateCheckStatusResponse client API ([#115](https://github.com/microsoft/durabletask-java/pull/115)) * Support long timers by breaking up into smaller timers ([#114](https://github.com/microsoft/durabletask-java/issues/114)) +* Support restartInstance and pass restartPostUri in HttpManagementPayload ([#108](https://github.com/microsoft/durabletask-java/issues/108)) ## v1.0.0 From ec0fe39d1454677c837d6b35b1c6169c990d7791 Mon Sep 17 00:00:00 2001 From: Kirstyn Joy Amperiadis Date: Thu, 30 Mar 2023 15:27:08 -0500 Subject: [PATCH 3/5] Add func description and remove unused var in test --- .../java/com/microsoft/durabletask/DurableTaskClient.java | 8 ++++++++ .../java/com/microsoft/durabletask/IntegrationTests.java | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index 25b0f7ac..af81abee 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -312,5 +312,13 @@ public void resumeInstance(String instanceId) { */ public abstract void resumeInstance(String instanceId, @Nullable String reason); + /** + * Restarts an existing orchestration instance with the original input. + * @param instanceId the ID of the previously run orchestration instance to restart. + * @param restartWithNewInstanceId true to restart the orchestration instance with a new instance ID + * false to restart the orchestration instance with same instance ID + * @return the ID of the scheduled orchestration instance, which is either instanceId or randomly + * generated depending on the value of restartWithNewInstanceId + */ public abstract String restartInstance(String instanceId, boolean restartWithNewInstanceId); } \ No newline at end of file diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 85a89d70..82f8beba 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -447,7 +447,7 @@ void restartOrchestrationThrowsException() { DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, "RestartTest"); + client.scheduleNewOrchestrationInstance(orchestratorName, "RestartTest"); assertThrows( IllegalArgumentException.class, From 5e7aba3de3ed47f261e7a3d401b63b7d2799dd05 Mon Sep 17 00:00:00 2001 From: Kirstyn Joy Amperiadis Date: Thu, 6 Jul 2023 15:37:33 -0500 Subject: [PATCH 4/5] Remove unsupported resume and suspend post URIs --- .../azurefunctions/HttpManagementPayload.java | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java index 6c937b9e..4cb45bc8 100644 --- a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java +++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java @@ -13,10 +13,8 @@ public class HttpManagementPayload { private final String id; private final String purgeHistoryDeleteUri; private final String restartPostUri; - private final String resumePostUri; private final String sendEventPostUri; private final String statusQueryGetUri; - private final String suspendPostUri; private final String terminatePostUri; /** @@ -33,10 +31,8 @@ public HttpManagementPayload( this.id = instanceId; this.purgeHistoryDeleteUri = instanceStatusURL + "?" + requiredQueryStringParameters; this.restartPostUri = instanceStatusURL + "/restart?" + requiredQueryStringParameters; - this.resumePostUri = instanceStatusURL + "/resume?reason={text}&" + requiredQueryStringParameters; this.sendEventPostUri = instanceStatusURL + "/raiseEvent/{eventName}?" + requiredQueryStringParameters; this.statusQueryGetUri = instanceStatusURL + "?" + requiredQueryStringParameters; - this.suspendPostUri = instanceStatusURL + "/suspend?reason={text}&" + requiredQueryStringParameters; this.terminatePostUri = instanceStatusURL + "/terminate?reason={text}&" + requiredQueryStringParameters; } @@ -94,21 +90,4 @@ public String getRestartPostUri() { return restartPostUri; } - /** - * Gets the HTTP POST instance resume endpoint. - * - * @return The HTTP URL for posting instance resume commands. - */ - public String getResumePostUri() { - return resumePostUri; - } - - /** - * Gets the HTTP POST instance suspend endpoint. - * - * @return The HTTP URL for posting instance suspend commands. - */ - public String getSuspendPostUri() { - return suspendPostUri; - } } From b7a47315f45540c23fd974ab5102effb15a3f0a7 Mon Sep 17 00:00:00 2001 From: Kirstyn Joy Amperiadis Date: Fri, 7 Jul 2023 11:33:09 -0500 Subject: [PATCH 5/5] Add end to end tests --- .../java/com/functions/EndToEndTests.java | 51 ++++++++++++++++--- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java index a174b39b..fa5f64df 100644 --- a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java +++ b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java @@ -5,6 +5,8 @@ import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static io.restassured.RestAssured.get; import static io.restassured.RestAssured.post; @@ -26,14 +28,7 @@ public void basicChain() throws InterruptedException { Response response = post(startOrchestrationPath); JsonPath jsonPath = response.jsonPath(); String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); - String runTimeStatus = null; - for (int i = 0; i < 15; i++) { - Response statusResponse = get(statusQueryGetUri); - runTimeStatus = statusResponse.jsonPath().get("runtimeStatus"); - if (!"Completed".equals(runTimeStatus)) { - Thread.sleep(1000); - } else break; - } + String runTimeStatus = waitForCompletion(statusQueryGetUri); assertEquals("Completed", runTimeStatus); } @@ -59,4 +54,44 @@ public void continueAsNew() throws InterruptedException { runTimeStatus = statusResponse.jsonPath().get("runtimeStatus"); assertEquals("Terminated", runTimeStatus); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void restart(boolean restartWithNewInstanceId) throws InterruptedException { + String startOrchestrationPath = "/api/StartOrchestration"; + Response response = post(startOrchestrationPath); + JsonPath jsonPath = response.jsonPath(); + String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); + String runTimeStatus = waitForCompletion(statusQueryGetUri); + assertEquals("Completed", runTimeStatus); + Response statusResponse = get(statusQueryGetUri); + String instanceId = statusResponse.jsonPath().get("instanceId"); + + String restartPostUri = jsonPath.get("restartPostUri") + "&restartWithNewInstanceId=" + restartWithNewInstanceId; + Response restartResponse = post(restartPostUri); + JsonPath restartJsonPath = restartResponse.jsonPath(); + String restartStatusQueryGetUri = restartJsonPath.get("statusQueryGetUri"); + String restartRuntimeStatus = waitForCompletion(restartStatusQueryGetUri); + assertEquals("Completed", restartRuntimeStatus); + Response restartStatusResponse = get(restartStatusQueryGetUri); + String newInstanceId = restartStatusResponse.jsonPath().get("instanceId"); + if (restartWithNewInstanceId) { + assertNotEquals(instanceId, newInstanceId); + } else { + assertEquals(instanceId, newInstanceId); + } + } + + private String waitForCompletion(String statusQueryGetUri) throws InterruptedException { + String runTimeStatus = null; + for (int i = 0; i < 15; i++) { + Response statusResponse = get(statusQueryGetUri); + runTimeStatus = statusResponse.jsonPath().get("runtimeStatus"); + if (!"Completed".equals(runTimeStatus)) { + Thread.sleep(1000); + } else break; + } + return runTimeStatus; + } + }