Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 104 additions & 146 deletions samples/Identity/src/main/java/identity/FleetProvisioningSample.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import utils.commandlineutils.CommandLineUtils;

Expand All @@ -44,9 +46,11 @@ public class FleetProvisioningSample {
static CompletableFuture<Void> gotResponse;
static IotIdentityClient iotIdentityClient;

static CreateKeysAndCertificateResponse createKeysAndCertificateResponse;
static CreateCertificateFromCsrResponse createCertificateFromCsrResponse;
static RegisterThingResponse registerThingResponse;
static CreateKeysAndCertificateResponse createKeysAndCertificateResponse = null;
static CreateCertificateFromCsrResponse createCertificateFromCsrResponse = null;
static RegisterThingResponse registerThingResponse = null;

static long responseWaitTimeMs = 5000L; // 5 seconds

static CommandLineUtils cmdUtils;

Expand All @@ -56,7 +60,6 @@ static void onRejectedKeys(ErrorResponse response) {
", statusCode: " + response.statusCode);

gotResponse.complete(null);
System.exit(1);
}

static void onRejectedCsr(ErrorResponse response) {
Expand All @@ -65,7 +68,6 @@ static void onRejectedCsr(ErrorResponse response) {
", statusCode: " + response.statusCode);

gotResponse.complete(null);
System.exit(1);
}

static void onRejectedRegister(ErrorResponse response) {
Expand All @@ -75,37 +77,48 @@ static void onRejectedRegister(ErrorResponse response) {
", statusCode: " + response.statusCode);

gotResponse.complete(null);
System.exit(1);
}

static void onCreateKeysAndCertificateAccepted(CreateKeysAndCertificateResponse response) {
System.out.println("CreateKeysAndCertificate response certificateId: " + response.certificateId);
if (response != null) {
createKeysAndCertificateResponse = response;
System.out.println("CreateKeysAndCertificate response certificateId: " + response.certificateId);
if (createKeysAndCertificateResponse == null) {
createKeysAndCertificateResponse = response;
} else {
System.out.println("CreateKeysAndCertificate response received after having already gotten a response!");
}
} else {
System.out.println("CreateKeysAndCertificate response is null");
}
gotResponse.complete(null);
}

static void onCreateCertificateFromCsrResponseAccepted(CreateCertificateFromCsrResponse response) {
System.out.println("CreateCertificateFromCsr response certificateId: " + response.certificateId);
if (response != null) {
createCertificateFromCsrResponse = response;
System.out.println("CreateCertificateFromCsr response certificateId: " + response.certificateId);
if (createCertificateFromCsrResponse == null) {
createCertificateFromCsrResponse = response;
} else {
System.out.println("CreateCertificateFromCsr response received after having already gotten a response!");
}
} else {
System.out.println("CreateCertificateFromCsr response is null");
}
gotResponse.complete(null);
}

static void onRegisterThingAccepted(RegisterThingResponse response) {
System.out.println("RegisterThing response thingName: " + response.thingName);
if (response != null) {
gotResponse.complete(null);
registerThingResponse = response;
System.out.println("RegisterThing response thingName: " + response.thingName);
if (registerThingResponse == null) {
registerThingResponse = response;
} else {
System.out.println("RegisterThing response received after having already gotten a response!");
}
} else {
System.out.println("RegisterThing response is null");
}
gotResponse.complete(null);
}

static void onException(Exception e) {
Expand Down Expand Up @@ -145,62 +158,48 @@ public void onConnectionResumed(boolean sessionPresent) {
}
};

try {
MqttClientConnection connection = null;
boolean exitWithError = false;

MqttClientConnection connection = cmdUtils.buildMQTTConnection(callbacks);
try {
connection = cmdUtils.buildMQTTConnection(callbacks);
iotIdentityClient = new IotIdentityClient(connection);

CompletableFuture<Boolean> connected = connection.connect();
try {
boolean sessionPresent = connected.get();
System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!");
} catch (Exception ex) {
throw new RuntimeException("Exception occurred during connect", ex);
}
boolean sessionPresent = connected.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!");

try {
if (csrPath == null) {
createKeysAndCertificateWorkflow();
} else {
createCertificateFromCsrWorkflow();
}
} catch (Exception e) {
throw new RuntimeException("Exception occurred during connect", e);
if (csrPath == null) {
createKeysAndCertificateWorkflow();
} else {
createCertificateFromCsrWorkflow();
}

CompletableFuture<Void> disconnected = connection.disconnect();
disconnected.get();
disconnected.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);

} catch (Exception ex) {
System.out.println("Exception encountered! " + "\n");
ex.printStackTrace();
exitWithError = true;
}

if (connection != null) {
// Close the connection now that we are completely done with it.
connection.close();

} catch (CrtRuntimeException | InterruptedException | ExecutionException ex) {
System.out.println("Exception encountered: " + ex.toString());
}

CrtResource.waitForNoResources();
System.out.println("Complete!");
}

private static void createKeysAndCertificateWorkflow() throws Exception {
CreateKeysAndCertificateSubscriptionRequest createKeysAndCertificateSubscriptionRequest = new CreateKeysAndCertificateSubscriptionRequest();
CompletableFuture<Integer> keysSubscribedAccepted = iotIdentityClient.SubscribeToCreateKeysAndCertificateAccepted(
createKeysAndCertificateSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
FleetProvisioningSample::onCreateKeysAndCertificateAccepted);

keysSubscribedAccepted.get();
System.out.println("Subscribed to CreateKeysAndCertificateAccepted");

CompletableFuture<Integer> keysSubscribedRejected = iotIdentityClient.SubscribeToCreateKeysAndCertificateRejected(
createKeysAndCertificateSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
FleetProvisioningSample::onRejectedKeys);

keysSubscribedRejected.get();
System.out.println("Subscribed to CreateKeysAndCertificateRejected");
System.out.println("Sample complete!");

if (exitWithError) {
System.exit(1);
} else {
System.exit(0);
}
}

private static void SubscribeToRegisterThing() throws Exception {
RegisterThingSubscriptionRequest registerThingSubscriptionRequest = new RegisterThingSubscriptionRequest();
registerThingSubscriptionRequest.templateName = templateName;

Expand All @@ -210,7 +209,7 @@ private static void createKeysAndCertificateWorkflow() throws Exception {
FleetProvisioningSample::onRegisterThingAccepted,
FleetProvisioningSample::onException);

subscribedRegisterAccepted.get();
subscribedRegisterAccepted.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Subscribed to SubscribeToRegisterThingAccepted");

CompletableFuture<Integer> subscribedRegisterRejected = iotIdentityClient.SubscribeToRegisterThingRejected(
Expand All @@ -219,20 +218,47 @@ private static void createKeysAndCertificateWorkflow() throws Exception {
FleetProvisioningSample::onRejectedRegister,
FleetProvisioningSample::onException);

subscribedRegisterRejected.get();
subscribedRegisterRejected.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Subscribed to SubscribeToRegisterThingRejected");
}

private static void createKeysAndCertificateWorkflow() throws Exception {
CreateKeysAndCertificateSubscriptionRequest createKeysAndCertificateSubscriptionRequest = new CreateKeysAndCertificateSubscriptionRequest();
CompletableFuture<Integer> keysSubscribedAccepted = iotIdentityClient.SubscribeToCreateKeysAndCertificateAccepted(
createKeysAndCertificateSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
FleetProvisioningSample::onCreateKeysAndCertificateAccepted);

keysSubscribedAccepted.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Subscribed to CreateKeysAndCertificateAccepted");

CompletableFuture<Integer> keysSubscribedRejected = iotIdentityClient.SubscribeToCreateKeysAndCertificateRejected(
createKeysAndCertificateSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
FleetProvisioningSample::onRejectedKeys);

keysSubscribedRejected.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Subscribed to CreateKeysAndCertificateRejected");

// Subscribes to the register thing accepted and rejected topics
SubscribeToRegisterThing();

CompletableFuture<Integer> publishKeys = iotIdentityClient.PublishCreateKeysAndCertificate(
new CreateKeysAndCertificateRequest(),
QualityOfService.AT_LEAST_ONCE);

publishKeys.get();
gotResponse = new CompletableFuture<>();
publishKeys.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Published to CreateKeysAndCertificate");
gotResponse.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Got response at CreateKeysAndCertificate");

waitForKeysRequest();
// Verify the response is good
if (createKeysAndCertificateResponse == null) {
throw new Exception("Got invalid/error createKeysAndCertificateResponse");
}

gotResponse = new CompletableFuture<>();

System.out.println("RegisterThing now....");
RegisterThingRequest registerThingRequest = new RegisterThingRequest();
registerThingRequest.certificateOwnershipToken = createKeysAndCertificateResponse.certificateOwnershipToken;
Expand All @@ -246,14 +272,10 @@ private static void createKeysAndCertificateWorkflow() throws Exception {
registerThingRequest,
QualityOfService.AT_LEAST_ONCE);

try {
publishRegister.get();
System.out.println("Published to RegisterThing");
} catch(Exception ex) {
throw new RuntimeException("Exception occurred during publish", ex);
}
gotResponse.get();
waitForRegisterRequest();
publishRegister.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Published to RegisterThing");
gotResponse.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Got response at RegisterThing");
}

private static void createCertificateFromCsrWorkflow() throws Exception {
Expand All @@ -263,53 +285,40 @@ private static void createCertificateFromCsrWorkflow() throws Exception {
QualityOfService.AT_LEAST_ONCE,
FleetProvisioningSample::onCreateCertificateFromCsrResponseAccepted);

csrSubscribedAccepted.get();
csrSubscribedAccepted.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Subscribed to CreateCertificateFromCsrAccepted");

CompletableFuture<Integer> csrSubscribedRejected = iotIdentityClient.SubscribeToCreateCertificateFromCsrRejected(
createCertificateFromCsrSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
FleetProvisioningSample::onRejectedCsr);

csrSubscribedRejected.get();
csrSubscribedRejected.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Subscribed to CreateCertificateFromCsrRejected");

RegisterThingSubscriptionRequest registerThingSubscriptionRequest = new RegisterThingSubscriptionRequest();
registerThingSubscriptionRequest.templateName = templateName;

CompletableFuture<Integer> subscribedRegisterAccepted = iotIdentityClient.SubscribeToRegisterThingAccepted(
registerThingSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
FleetProvisioningSample::onRegisterThingAccepted,
FleetProvisioningSample::onException);

subscribedRegisterAccepted.get();
System.out.println("Subscribed to SubscribeToRegisterThingAccepted");

CompletableFuture<Integer> subscribedRegisterRejected = iotIdentityClient.SubscribeToRegisterThingRejected(
registerThingSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
FleetProvisioningSample::onRejectedRegister,
FleetProvisioningSample::onException);

subscribedRegisterRejected.get();
System.out.println("Subscribed to SubscribeToRegisterThingRejected");
// Subscribes to the register thing accepted and rejected topics
SubscribeToRegisterThing();

String csrContents = new String(Files.readAllBytes(Paths.get(csrPath)));

CreateCertificateFromCsrRequest createCertificateFromCsrRequest = new CreateCertificateFromCsrRequest();
createCertificateFromCsrRequest.certificateSigningRequest = csrContents;
CompletableFuture<Integer> publishCsr = iotIdentityClient.PublishCreateCertificateFromCsr(
createCertificateFromCsrRequest,
QualityOfService.AT_LEAST_ONCE);

publishCsr.get();
gotResponse = new CompletableFuture<>();
publishCsr.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Published to CreateCertificateFromCsr");
gotResponse.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Got response at CreateCertificateFromCsr");

waitForCsrRequest();
// Verify the response is good
if (createCertificateFromCsrResponse == null) {
throw new Exception("Got invalid/error createCertificateFromCsrResponse");
}

gotResponse = new CompletableFuture<>();

System.out.println("RegisterThing now....");
RegisterThingRequest registerThingRequest = new RegisterThingRequest();
registerThingRequest.certificateOwnershipToken = createCertificateFromCsrResponse.certificateOwnershipToken;
registerThingRequest.templateName = templateName;
Expand All @@ -318,60 +327,9 @@ private static void createCertificateFromCsrWorkflow() throws Exception {
registerThingRequest,
QualityOfService.AT_LEAST_ONCE);

publishRegister.get();
publishRegister.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Published to RegisterThing");

waitForRegisterRequest();
}

public static void waitForCsrRequest() {
try {
// Wait for the response.
int loopCount = 0;
while (loopCount < 30 && createCertificateFromCsrResponse == null) {
if (createCertificateFromCsrResponse != null) {
break;
}
System.out.println("Waiting...for CreateCertificateFromCsrResponse");
loopCount += 1;
Thread.sleep(50L);
}
} catch (InterruptedException e) {
System.out.println("Exception occured");
}
}

public static void waitForKeysRequest() {
try {
// Wait for the response.
int loopCount = 0;
while (loopCount < 30 && createKeysAndCertificateResponse == null) {
if (createKeysAndCertificateResponse != null) {
break;
}
System.out.println("Waiting...for CreateKeysAndCertificateResponse");
loopCount += 1;
Thread.sleep(50L);
}
} catch (InterruptedException e) {
System.out.println("Exception occured");
}
}

public static void waitForRegisterRequest() {
try {
// Wait for the response.
int loopCount = 0;
while (loopCount < 30 && registerThingResponse == null) {
if (registerThingResponse != null) {
break;
}
System.out.println("Waiting...for registerThingResponse");
loopCount += 1;
Thread.sleep(50L);
}
} catch (InterruptedException e) {
System.out.println("Exception occured");
}
gotResponse.get(responseWaitTimeMs, TimeUnit.MILLISECONDS);
System.out.println("Got response at RegisterThing");
}
}