From a5fb86ce86a9bc0830b1f004c4ee8e8cb2cb3bb6 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Thu, 11 Jul 2024 16:50:39 -0700 Subject: [PATCH 1/4] Fix race conditions in fleet provisioning samples --- .../fleet_provisioning/main.cpp | 636 +++++++++--------- .../mqtt5_fleet_provisioning/main.cpp | 633 +++++++++-------- 2 files changed, 614 insertions(+), 655 deletions(-) diff --git a/samples/fleet_provisioning/fleet_provisioning/main.cpp b/samples/fleet_provisioning/fleet_provisioning/main.cpp index 3fb3d55c4..1dbcd0ea2 100644 --- a/samples/fleet_provisioning/fleet_provisioning/main.cpp +++ b/samples/fleet_provisioning/fleet_provisioning/main.cpp @@ -4,8 +4,6 @@ */ #include #include -#include -#include #include @@ -21,29 +19,12 @@ #include #include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include #include "../../utils/CommandLineUtils.h" using namespace Aws::Crt; using namespace Aws::Iotidentity; -using namespace std::this_thread; // sleep_for, sleep_until -using namespace std::chrono; // nanoseconds, system_clock, seconds - -static void sleep(int sleeptime) -{ - std::cout << "Sleeping for " << sleeptime << " seconds..." << std::endl; - sleep_until(system_clock::now() + seconds(sleeptime)); -} static std::string getFileData(std::string const &fileName) { @@ -53,6 +34,201 @@ static std::string getFileData(std::string const &fileName) return str; } +/** + * Auxiliary structure for holding data used when creating a certificate. + */ +struct CreateCertificateContext +{ + std::promise pubAckPromise; + std::promise acceptedSubAckPromise; + std::promise rejectedSubAckPromise; + std::promise tokenReceivedPromise; + String token; +}; + +/** + * Keys-and-Certificate workflow. + * + * @note Subscriptions created here will be active even after the function completes. So, all variables accessed in the + * callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of + * CreateCertificateContext is used to store variables used by the callbacks. + */ +void useKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateContext &ctx) +{ + auto onKeysPublishPubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error publishing to CreateKeysAndCertificate: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + ctx.pubAckPromise.set_value(); + }; + + auto onKeysAcceptedSubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to CreateKeysAndCertificate accepted: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + ctx.acceptedSubAckPromise.set_value(); + }; + + auto onKeysRejectedSubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to CreateKeysAndCertificate rejected: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + ctx.rejectedSubAckPromise.set_value(); + }; + + auto onKeysAccepted = [&ctx](CreateKeysAndCertificateResponse *response, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) + { + fprintf(stdout, "CreateKeysAndCertificateResponse certificateId: %s.\n", response->CertificateId->c_str()); + ctx.token = *response->CertificateOwnershipToken; + ctx.tokenReceivedPromise.set_value(); + } + else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + auto onKeysRejected = [&](ErrorResponse *error, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) + { + fprintf( + stdout, + "CreateKeysAndCertificate failed with statusCode %d, errorMessage %s and errorCode %s.", + *error->StatusCode, + error->ErrorMessage->c_str(), + error->ErrorCode->c_str()); + exit(-1); + } + else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + fprintf(stdout, "Subscribing to CreateKeysAndCertificate Accepted and Rejected topics\n"); + CreateKeysAndCertificateSubscriptionRequest keySubscriptionRequest; + identityClient.SubscribeToCreateKeysAndCertificateAccepted( + keySubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysAccepted, onKeysAcceptedSubAck); + identityClient.SubscribeToCreateKeysAndCertificateRejected( + keySubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysRejected, onKeysRejectedSubAck); + + // Wait for the subscriptions to the accept and reject keys-and-certificate topics to be established. + ctx.acceptedSubAckPromise.get_future().wait(); + ctx.rejectedSubAckPromise.get_future().wait(); + + // Now, when we subscribed to the keys and certificate topics, we can make a request for a certificate. + fprintf(stdout, "Publishing to CreateKeysAndCertificate topic\n"); + CreateKeysAndCertificateRequest createKeysAndCertificateRequest; + identityClient.PublishCreateKeysAndCertificate( + createKeysAndCertificateRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysPublishPubAck); + ctx.pubAckPromise.get_future().wait(); + + // Wait for a certificate token. + ctx.tokenReceivedPromise.get_future().wait(); +} + +/** + * Certificate-from-CSR workflow. + * + * @note Subscriptions created here will be active even after the function completes. So, all variables accessed in the + * callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of + * CreateCertificateContext is used to store variables used by the callbacks. + */ +void useCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, const String &csrFile) +{ + auto onCsrPublishPubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error publishing to CreateCertificateFromCsr: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + ctx.pubAckPromise.set_value(); + }; + + auto onCsrAcceptedSubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to CreateCertificateFromCsr accepted: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + ctx.acceptedSubAckPromise.set_value(); + }; + + auto onCsrRejectedSubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to CreateCertificateFromCsr rejected: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + ctx.rejectedSubAckPromise.set_value(); + }; + + auto onCsrAccepted = [&ctx](CreateCertificateFromCsrResponse *response, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) + { + fprintf(stdout, "CreateCertificateFromCsrResponse certificateId: %s.\n", response->CertificateId->c_str()); + ctx.token = *response->CertificateOwnershipToken; + ctx.tokenReceivedPromise.set_value(); + } + else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + auto onCsrRejected = [&](ErrorResponse *error, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) + { + fprintf( + stdout, + "CreateCertificateFromCsr failed with statusCode %d, errorMessage %s and errorCode %s.", + *error->StatusCode, + error->ErrorMessage->c_str(), + error->ErrorCode->c_str()); + exit(-1); + } + else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + // CreateCertificateFromCsr workflow + fprintf(stdout, "Subscribing to CreateCertificateFromCsr Accepted and Rejected topics\n"); + CreateCertificateFromCsrSubscriptionRequest csrSubscriptionRequest; + identityClient.SubscribeToCreateCertificateFromCsrAccepted( + csrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrAccepted, onCsrAcceptedSubAck); + + identityClient.SubscribeToCreateCertificateFromCsrRejected( + csrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrRejected, onCsrRejectedSubAck); + + // Wait for the subscriptions to the accept and reject certificates topics to be established. + ctx.acceptedSubAckPromise.get_future().wait(); + ctx.rejectedSubAckPromise.get_future().wait(); + + // Now, when we subscribed to the certificates topics, we can make a request for a certificate. + fprintf(stdout, "Publishing to CreateCertificateFromCsr topic\n"); + CreateCertificateFromCsrRequest createCertificateFromCsrRequest; + createCertificateFromCsrRequest.CertificateSigningRequest = csrFile; + identityClient.PublishCreateCertificateFromCsr( + createCertificateFromCsrRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrPublishPubAck); + ctx.pubAckPromise.get_future().wait(); + + // Wait for a certificate token. + ctx.tokenReceivedPromise.get_future().wait(); +} + int main(int argc, char *argv[]) { /************************ Setup ****************************/ @@ -61,7 +237,6 @@ int main(int argc, char *argv[]) ApiHandle apiHandle; // Variables for the sample String csrFile; - String token; RegisterThingResponse registerThingResponse; /** @@ -78,7 +253,7 @@ int main(int argc, char *argv[]) /** * In a real world application you probably don't want to enforce synchronous behavior - * but this is a sample console application, so we'll just do that with a condition variable. + * but this is a sample console application, so we'll just do that with a promise. */ std::promise connectionCompletedPromise; std::promise connectionClosedPromise; @@ -147,322 +322,127 @@ int main(int argc, char *argv[]) exit(-1); } - if (connectionCompletedPromise.get_future().get()) + if (!connectionCompletedPromise.get_future().get()) { - IotIdentityClient identityClient(connection); - - std::promise csrPublishCompletedPromise; - std::promise csrAcceptedCompletedPromise; - std::promise csrRejectedCompletedPromise; - - std::promise keysPublishCompletedPromise; - std::promise keysAcceptedCompletedPromise; - std::promise keysRejectedCompletedPromise; - - std::promise registerPublishCompletedPromise; - std::promise registerAcceptedCompletedPromise; - std::promise registerRejectedCompletedPromise; - - auto onCsrPublishSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error publishing to CreateCertificateFromCsr: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - - csrPublishCompletedPromise.set_value(); - }; - - auto onCsrAcceptedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf( - stderr, "Error subscribing to CreateCertificateFromCsr accepted: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - - csrAcceptedCompletedPromise.set_value(); - }; - - auto onCsrRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf( - stderr, "Error subscribing to CreateCertificateFromCsr rejected: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - csrRejectedCompletedPromise.set_value(); - }; - - auto onCsrAccepted = [&](CreateCertificateFromCsrResponse *response, int ioErr) { - if (ioErr == AWS_OP_SUCCESS) - { - fprintf( - stdout, "CreateCertificateFromCsrResponse certificateId: %s.\n", response->CertificateId->c_str()); - token = *response->CertificateOwnershipToken; - } - else - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - }; - - auto onCsrRejected = [&](ErrorResponse *error, int ioErr) { - if (ioErr == AWS_OP_SUCCESS) - { - fprintf( - stdout, - "CreateCertificateFromCsr failed with statusCode %d, errorMessage %s and errorCode %s.", - *error->StatusCode, - error->ErrorMessage->c_str(), - error->ErrorCode->c_str()); - exit(-1); - } - else - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - }; - - auto onKeysPublishSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error publishing to CreateKeysAndCertificate: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - - keysPublishCompletedPromise.set_value(); - }; - - auto onKeysAcceptedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf( - stderr, "Error subscribing to CreateKeysAndCertificate accepted: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - - keysAcceptedCompletedPromise.set_value(); - }; - - auto onKeysRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf( - stderr, "Error subscribing to CreateKeysAndCertificate rejected: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - keysRejectedCompletedPromise.set_value(); - }; - - auto onKeysAccepted = [&](CreateKeysAndCertificateResponse *response, int ioErr) { - if (ioErr == AWS_OP_SUCCESS) - { - fprintf( - stdout, "CreateKeysAndCertificateResponse certificateId: %s.\n", response->CertificateId->c_str()); - token = *response->CertificateOwnershipToken; - } - else - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - }; - - auto onKeysRejected = [&](ErrorResponse *error, int ioErr) { - if (ioErr == AWS_OP_SUCCESS) - { - fprintf( - stdout, - "CreateKeysAndCertificate failed with statusCode %d, errorMessage %s and errorCode %s.", - *error->StatusCode, - error->ErrorMessage->c_str(), - error->ErrorCode->c_str()); - exit(-1); - } - else - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - }; - - auto onRegisterAcceptedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } + return -1; + } + + IotIdentityClient identityClient(connection); + + std::promise registerPublishPubAckCompletedPromise; + std::promise registerAcceptedSubAckCompletedPromise; + std::promise registerRejectedSubAckCompletedPromise; + std::promise registerAcceptedCompletedPromise; + + auto onRegisterAcceptedSubAck = [&](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + registerAcceptedSubAckCompletedPromise.set_value(); + }; + + auto onRegisterRejectedSubAck = [&](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + registerRejectedSubAckCompletedPromise.set_value(); + }; + + auto onRegisterAccepted = [&](RegisterThingResponse *response, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) + { + fprintf(stdout, "RegisterThingResponse ThingName: %s.\n", response->ThingName->c_str()); registerAcceptedCompletedPromise.set_value(); - }; - - auto onRegisterRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - registerRejectedCompletedPromise.set_value(); - }; - - auto onRegisterAccepted = [&](RegisterThingResponse *response, int ioErr) { - if (ioErr == AWS_OP_SUCCESS) - { - fprintf(stdout, "RegisterThingResponse ThingName: %s.\n", response->ThingName->c_str()); - } - else - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - }; - - auto onRegisterRejected = [&](ErrorResponse *error, int ioErr) { - if (ioErr == AWS_OP_SUCCESS) - { - fprintf( - stdout, - "RegisterThing failed with statusCode %d, errorMessage %s and errorCode %s.", - *error->StatusCode, - error->ErrorMessage->c_str(), - error->ErrorCode->c_str()); - } - else - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - }; - - auto onRegisterPublishSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - - registerPublishCompletedPromise.set_value(); - }; - - if (csrFile.empty()) + } + else { - // CreateKeysAndCertificate workflow - std::cout << "Subscribing to CreateKeysAndCertificate Accepted and Rejected topics" << std::endl; - CreateKeysAndCertificateSubscriptionRequest keySubscriptionRequest; - identityClient.SubscribeToCreateKeysAndCertificateAccepted( - keySubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysAccepted, onKeysAcceptedSubAck); - - identityClient.SubscribeToCreateKeysAndCertificateRejected( - keySubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysRejected, onKeysRejectedSubAck); - - std::cout << "Publishing to CreateKeysAndCertificate topic" << std::endl; - CreateKeysAndCertificateRequest createKeysAndCertificateRequest; - identityClient.PublishCreateKeysAndCertificate( - createKeysAndCertificateRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysPublishSubAck); - - std::cout << "Subscribing to RegisterThing Accepted and Rejected topics" << std::endl; - RegisterThingSubscriptionRequest registerSubscriptionRequest; - registerSubscriptionRequest.TemplateName = cmdData.input_templateName; - - identityClient.SubscribeToRegisterThingAccepted( - registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterAccepted, onRegisterAcceptedSubAck); - - identityClient.SubscribeToRegisterThingRejected( - registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterRejected, onRegisterRejectedSubAck); - - sleep(1); - - std::cout << "Publishing to RegisterThing topic" << std::endl; - RegisterThingRequest registerThingRequest; - registerThingRequest.TemplateName = cmdData.input_templateName; - - const Aws::Crt::String jsonValue = cmdData.input_templateParameters; - Aws::Crt::JsonObject value(jsonValue); - Map pm = value.View().GetAllObjects(); - Aws::Crt::Map params = - Aws::Crt::Map(); - - for (const auto &x : pm) - { - params.emplace(x.first, x.second.AsString()); - } - - registerThingRequest.Parameters = params; - registerThingRequest.CertificateOwnershipToken = token; - - identityClient.PublishRegisterThing( - registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishSubAck); - sleep(1); - - keysPublishCompletedPromise.get_future().wait(); - keysAcceptedCompletedPromise.get_future().wait(); - keysRejectedCompletedPromise.get_future().wait(); - registerPublishCompletedPromise.get_future().wait(); - registerAcceptedCompletedPromise.get_future().wait(); - registerRejectedCompletedPromise.get_future().wait(); + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + auto onRegisterRejected = [&](ErrorResponse *error, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) + { + fprintf( + stdout, + "RegisterThing failed with statusCode %d, errorMessage %s and errorCode %s.", + *error->StatusCode, + error->ErrorMessage->c_str(), + error->ErrorCode->c_str()); } else { - // CreateCertificateFromCsr workflow - std::cout << "Subscribing to CreateCertificateFromCsr Accepted and Rejected topics" << std::endl; - CreateCertificateFromCsrSubscriptionRequest csrSubscriptionRequest; - identityClient.SubscribeToCreateCertificateFromCsrAccepted( - csrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrAccepted, onCsrAcceptedSubAck); - - identityClient.SubscribeToCreateCertificateFromCsrRejected( - csrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrRejected, onCsrRejectedSubAck); - - std::cout << "Publishing to CreateCertificateFromCsr topic" << std::endl; - CreateCertificateFromCsrRequest createCertificateFromCsrRequest; - createCertificateFromCsrRequest.CertificateSigningRequest = csrFile; - identityClient.PublishCreateCertificateFromCsr( - createCertificateFromCsrRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrPublishSubAck); - - std::cout << "Subscribing to RegisterThing Accepted and Rejected topics" << std::endl; - RegisterThingSubscriptionRequest registerSubscriptionRequest; - registerSubscriptionRequest.TemplateName = cmdData.input_templateName; - - identityClient.SubscribeToRegisterThingAccepted( - registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterAccepted, onRegisterAcceptedSubAck); - - identityClient.SubscribeToRegisterThingRejected( - registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterRejected, onRegisterRejectedSubAck); - - sleep(2); - - std::cout << "Publishing to RegisterThing topic" << std::endl; - RegisterThingRequest registerThingRequest; - registerThingRequest.TemplateName = cmdData.input_templateName; - - const Aws::Crt::String jsonValue = cmdData.input_templateParameters; - Aws::Crt::JsonObject value(jsonValue); - Map pm = value.View().GetAllObjects(); - Aws::Crt::Map params = - Aws::Crt::Map(); - - for (const auto &x : pm) - { - params.emplace(x.first, x.second.AsString()); - } - - registerThingRequest.Parameters = params; - registerThingRequest.CertificateOwnershipToken = token; - - identityClient.PublishRegisterThing( - registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishSubAck); - sleep(2); - - csrPublishCompletedPromise.get_future().wait(); - csrAcceptedCompletedPromise.get_future().wait(); - csrRejectedCompletedPromise.get_future().wait(); - registerPublishCompletedPromise.get_future().wait(); - registerAcceptedCompletedPromise.get_future().wait(); - registerRejectedCompletedPromise.get_future().wait(); + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + auto onRegisterPublishPubAck = [&](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); + exit(-1); } + + registerPublishPubAckCompletedPromise.set_value(); + }; + + // Create certificate. + CreateCertificateContext certificateContext; + if (csrFile.empty()) + { + useKeysAndCertificate(identityClient, certificateContext); } + else + { + useCsr(identityClient, certificateContext, csrFile); + } + + // After certificate is obtained, it's time to register a thing. + fprintf(stdout, "Subscribing to RegisterThing Accepted and Rejected topics\n"); + RegisterThingSubscriptionRequest registerSubscriptionRequest; + registerSubscriptionRequest.TemplateName = cmdData.input_templateName; + + identityClient.SubscribeToRegisterThingAccepted( + registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterAccepted, onRegisterAcceptedSubAck); + + identityClient.SubscribeToRegisterThingRejected( + registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterRejected, onRegisterRejectedSubAck); + + // Wait for the subscriptions to the accept and reject RegisterThing topics to be established. + registerAcceptedSubAckCompletedPromise.get_future().wait(); + registerRejectedSubAckCompletedPromise.get_future().wait(); + + fprintf(stdout, "Publishing to RegisterThing topic\n"); + RegisterThingRequest registerThingRequest; + registerThingRequest.TemplateName = cmdData.input_templateName; + + const Aws::Crt::String jsonValue = cmdData.input_templateParameters; + Aws::Crt::JsonObject value(jsonValue); + Map pm = value.View().GetAllObjects(); + Aws::Crt::Map params = Aws::Crt::Map(); + + for (const auto &x : pm) + { + params.emplace(x.first, x.second.AsString()); + } + + registerThingRequest.Parameters = params; + // NOTE: In a real application creating multiple certificates you'll probably need to protect token var with + // a critical section. This sample makes only one request for a certificate, so no data race is possible. + registerThingRequest.CertificateOwnershipToken = certificateContext.token; + + identityClient.PublishRegisterThing(registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishPubAck); + registerPublishPubAckCompletedPromise.get_future().wait(); + + // Wait for registering a thing to succeed. + registerAcceptedCompletedPromise.get_future().wait(); // Disconnect if (connection->Disconnect()) diff --git a/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp b/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp index 994d83a56..21ed958a8 100644 --- a/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp +++ b/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp @@ -4,8 +4,6 @@ */ #include #include -#include -#include #include #include #include @@ -22,29 +20,12 @@ #include #include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include #include "../../utils/CommandLineUtils.h" using namespace Aws::Crt; using namespace Aws::Iotidentity; -using namespace std::this_thread; // sleep_for, sleep_until -using namespace std::chrono; // nanoseconds, system_clock, seconds - -static void sleep(int sleeptime) -{ - std::cout << "Sleeping for " << sleeptime << " seconds..." << std::endl; - sleep_until(system_clock::now() + seconds(sleeptime)); -} static std::string getFileData(std::string const &fileName) { @@ -54,6 +35,201 @@ static std::string getFileData(std::string const &fileName) return str; } +/** + * Auxiliary structure for holding data used when creating a certificate. + */ +struct CreateCertificateContext +{ + std::promise pubAckPromise; + std::promise acceptedSubAckPromise; + std::promise rejectedSubAckPromise; + std::promise tokenReceivedPromise; + String token; +}; + +/** + * Keys-and-Certificate workflow. + * + * @note Subscriptions created here will be active even after the function completes. So, all variables accessed in the + * callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of + * CreateCertificateContext is used to store variables used by the callbacks. + */ +void useKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateContext &ctx) +{ + auto onKeysPublishPubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error publishing to CreateKeysAndCertificate: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + ctx.pubAckPromise.set_value(); + }; + + auto onKeysAcceptedSubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to CreateKeysAndCertificate accepted: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + ctx.acceptedSubAckPromise.set_value(); + }; + + auto onKeysRejectedSubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to CreateKeysAndCertificate rejected: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + ctx.rejectedSubAckPromise.set_value(); + }; + + auto onKeysAccepted = [&ctx](CreateKeysAndCertificateResponse *response, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) + { + fprintf(stdout, "CreateKeysAndCertificateResponse certificateId: %s.\n", response->CertificateId->c_str()); + ctx.token = *response->CertificateOwnershipToken; + ctx.tokenReceivedPromise.set_value(); + } + else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + auto onKeysRejected = [&](ErrorResponse *error, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) + { + fprintf( + stdout, + "CreateKeysAndCertificate failed with statusCode %d, errorMessage %s and errorCode %s.", + *error->StatusCode, + error->ErrorMessage->c_str(), + error->ErrorCode->c_str()); + exit(-1); + } + else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + fprintf(stdout, "Subscribing to CreateKeysAndCertificate Accepted and Rejected topics\n"); + CreateKeysAndCertificateSubscriptionRequest keySubscriptionRequest; + identityClient.SubscribeToCreateKeysAndCertificateAccepted( + keySubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysAccepted, onKeysAcceptedSubAck); + identityClient.SubscribeToCreateKeysAndCertificateRejected( + keySubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysRejected, onKeysRejectedSubAck); + + // Wait for the subscriptions to the accept and reject keys-and-certificate topics to be established. + ctx.acceptedSubAckPromise.get_future().wait(); + ctx.rejectedSubAckPromise.get_future().wait(); + + // Now, when we subscribed to the keys and certificate topics, we can make a request for a certificate. + fprintf(stdout, "Publishing to CreateKeysAndCertificate topic\n"); + CreateKeysAndCertificateRequest createKeysAndCertificateRequest; + identityClient.PublishCreateKeysAndCertificate( + createKeysAndCertificateRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysPublishPubAck); + ctx.pubAckPromise.get_future().wait(); + + // Wait for a certificate token. + ctx.tokenReceivedPromise.get_future().wait(); +} + +/** + * Certificate-from-CSR workflow. + * + * @note Subscriptions created here will be active even after the function completes. So, all variables accessed in the + * callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of + * CreateCertificateContext is used to store variables used by the callbacks. + */ +void useCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, const String &csrFile) +{ + auto onCsrPublishPubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error publishing to CreateCertificateFromCsr: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + ctx.pubAckPromise.set_value(); + }; + + auto onCsrAcceptedSubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to CreateCertificateFromCsr accepted: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + ctx.acceptedSubAckPromise.set_value(); + }; + + auto onCsrRejectedSubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to CreateCertificateFromCsr rejected: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + ctx.rejectedSubAckPromise.set_value(); + }; + + auto onCsrAccepted = [&ctx](CreateCertificateFromCsrResponse *response, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) + { + fprintf(stdout, "CreateCertificateFromCsrResponse certificateId: %s.\n", response->CertificateId->c_str()); + ctx.token = *response->CertificateOwnershipToken; + ctx.tokenReceivedPromise.set_value(); + } + else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + auto onCsrRejected = [&](ErrorResponse *error, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) + { + fprintf( + stdout, + "CreateCertificateFromCsr failed with statusCode %d, errorMessage %s and errorCode %s.", + *error->StatusCode, + error->ErrorMessage->c_str(), + error->ErrorCode->c_str()); + exit(-1); + } + else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + // CreateCertificateFromCsr workflow + fprintf(stdout, "Subscribing to CreateCertificateFromCsr Accepted and Rejected topics\n"); + CreateCertificateFromCsrSubscriptionRequest csrSubscriptionRequest; + identityClient.SubscribeToCreateCertificateFromCsrAccepted( + csrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrAccepted, onCsrAcceptedSubAck); + + identityClient.SubscribeToCreateCertificateFromCsrRejected( + csrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrRejected, onCsrRejectedSubAck); + + // Wait for the subscriptions to the accept and reject certificates topics to be established. + ctx.acceptedSubAckPromise.get_future().wait(); + ctx.rejectedSubAckPromise.get_future().wait(); + + // Now, when we subscribed to the certificates topics, we can make a request for a certificate. + fprintf(stdout, "Publishing to CreateCertificateFromCsr topic\n"); + CreateCertificateFromCsrRequest createCertificateFromCsrRequest; + createCertificateFromCsrRequest.CertificateSigningRequest = csrFile; + identityClient.PublishCreateCertificateFromCsr( + createCertificateFromCsrRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrPublishPubAck); + ctx.pubAckPromise.get_future().wait(); + + // Wait for a certificate token. + ctx.tokenReceivedPromise.get_future().wait(); +} + int main(int argc, char *argv[]) { /************************ Setup ****************************/ @@ -62,7 +238,6 @@ int main(int argc, char *argv[]) ApiHandle apiHandle; // Variables for the sample String csrFile; - String token; RegisterThingResponse registerThingResponse; /** @@ -141,323 +316,127 @@ int main(int argc, char *argv[]) exit(-1); } - if (connectionPromise.get_future().get()) + if (!connectionPromise.get_future().get()) { - IotIdentityClient identityClient(client); - - std::promise csrPublishCompletedPromise; - std::promise csrAcceptedCompletedPromise; - std::promise csrRejectedCompletedPromise; - - std::promise keysPublishCompletedPromise; - std::promise keysAcceptedCompletedPromise; - std::promise keysRejectedCompletedPromise; - - std::promise registerPublishCompletedPromise; - std::promise registerAcceptedCompletedPromise; - std::promise registerRejectedCompletedPromise; - - auto onCsrPublishSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error publishing to CreateCertificateFromCsr: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - - csrPublishCompletedPromise.set_value(); - }; - - auto onCsrAcceptedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf( - stderr, "Error subscribing to CreateCertificateFromCsr accepted: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - - csrAcceptedCompletedPromise.set_value(); - }; - - auto onCsrRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf( - stderr, "Error subscribing to CreateCertificateFromCsr rejected: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - csrRejectedCompletedPromise.set_value(); - }; - - auto onCsrAccepted = [&](CreateCertificateFromCsrResponse *response, int ioErr) { - if (ioErr == AWS_OP_SUCCESS) - { - fprintf( - stdout, "CreateCertificateFromCsrResponse certificateId: %s.\n", response->CertificateId->c_str()); - token = *response->CertificateOwnershipToken; - } - else - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - }; - - auto onCsrRejected = [&](ErrorResponse *error, int ioErr) { - if (ioErr == AWS_OP_SUCCESS) - { - fprintf( - stdout, - "CreateCertificateFromCsr failed with statusCode %d, errorMessage %s and errorCode %s.", - *error->StatusCode, - error->ErrorMessage->c_str(), - error->ErrorCode->c_str()); - exit(-1); - } - else - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - }; - - auto onKeysPublishSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error publishing to CreateKeysAndCertificate: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - - keysPublishCompletedPromise.set_value(); - }; - - auto onKeysAcceptedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf( - stderr, "Error subscribing to CreateKeysAndCertificate accepted: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - - keysAcceptedCompletedPromise.set_value(); - }; - - auto onKeysRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf( - stderr, "Error subscribing to CreateKeysAndCertificate rejected: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - keysRejectedCompletedPromise.set_value(); - }; - - auto onKeysAccepted = [&](CreateKeysAndCertificateResponse *response, int ioErr) { - if (ioErr == AWS_OP_SUCCESS) - { - fprintf( - stdout, "CreateKeysAndCertificateResponse certificateId: %s.\n", response->CertificateId->c_str()); - token = *response->CertificateOwnershipToken; - } - else - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - }; - - auto onKeysRejected = [&](ErrorResponse *error, int ioErr) { - if (ioErr == AWS_OP_SUCCESS) - { - fprintf( - stdout, - "CreateKeysAndCertificate failed with statusCode %d, errorMessage %s and errorCode %s.", - *error->StatusCode, - error->ErrorMessage->c_str(), - error->ErrorCode->c_str()); - exit(-1); - } - else - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - }; - - auto onRegisterAcceptedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } + return -1; + } + IotIdentityClient identityClient(client); + + std::promise registerPublishPubAckCompletedPromise; + std::promise registerAcceptedSubAckCompletedPromise; + std::promise registerRejectedSubAckCompletedPromise; + std::promise registerAcceptedCompletedPromise; + + auto onRegisterAcceptedSubAck = [&](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + + registerAcceptedSubAckCompletedPromise.set_value(); + }; + auto onRegisterRejectedSubAck = [&](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + registerRejectedSubAckCompletedPromise.set_value(); + }; + + auto onRegisterAccepted = [&](RegisterThingResponse *response, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) + { + fprintf(stdout, "RegisterThingResponse ThingName: %s.\n", response->ThingName->c_str()); registerAcceptedCompletedPromise.set_value(); - }; - - auto onRegisterRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - registerRejectedCompletedPromise.set_value(); - }; - - auto onRegisterAccepted = [&](RegisterThingResponse *response, int ioErr) { - if (ioErr == AWS_OP_SUCCESS) - { - fprintf(stdout, "RegisterThingResponse ThingName: %s.\n", response->ThingName->c_str()); - } - else - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - }; - - auto onRegisterRejected = [&](ErrorResponse *error, int ioErr) { - if (ioErr == AWS_OP_SUCCESS) - { - fprintf( - stdout, - "RegisterThing failed with statusCode %d, errorMessage %s and errorCode %s.", - *error->StatusCode, - error->ErrorMessage->c_str(), - error->ErrorCode->c_str()); - } - else - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - }; - - auto onRegisterPublishSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - - registerPublishCompletedPromise.set_value(); - }; - - if (csrFile.empty()) + } + else { - // CreateKeysAndCertificate workflow - std::cout << "Subscribing to CreateKeysAndCertificate Accepted and Rejected topics" << std::endl; - CreateKeysAndCertificateSubscriptionRequest keySubscriptionRequest; - identityClient.SubscribeToCreateKeysAndCertificateAccepted( - keySubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysAccepted, onKeysAcceptedSubAck); - - identityClient.SubscribeToCreateKeysAndCertificateRejected( - keySubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysRejected, onKeysRejectedSubAck); - - std::cout << "Publishing to CreateKeysAndCertificate topic" << std::endl; - CreateKeysAndCertificateRequest createKeysAndCertificateRequest; - identityClient.PublishCreateKeysAndCertificate( - createKeysAndCertificateRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysPublishSubAck); - - std::cout << "Subscribing to RegisterThing Accepted and Rejected topics" << std::endl; - RegisterThingSubscriptionRequest registerSubscriptionRequest; - registerSubscriptionRequest.TemplateName = cmdData.input_templateName; - - identityClient.SubscribeToRegisterThingAccepted( - registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterAccepted, onRegisterAcceptedSubAck); - - identityClient.SubscribeToRegisterThingRejected( - registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterRejected, onRegisterRejectedSubAck); - - sleep(1); - - std::cout << "Publishing to RegisterThing topic" << std::endl; - RegisterThingRequest registerThingRequest; - registerThingRequest.TemplateName = cmdData.input_templateName; - - const Aws::Crt::String jsonValue = cmdData.input_templateParameters; - Aws::Crt::JsonObject value(jsonValue); - Map pm = value.View().GetAllObjects(); - Aws::Crt::Map params = - Aws::Crt::Map(); - - for (const auto &x : pm) - { - params.emplace(x.first, x.second.AsString()); - } - - registerThingRequest.Parameters = params; - registerThingRequest.CertificateOwnershipToken = token; - - identityClient.PublishRegisterThing( - registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishSubAck); - sleep(1); - - keysPublishCompletedPromise.get_future().wait(); - keysAcceptedCompletedPromise.get_future().wait(); - keysRejectedCompletedPromise.get_future().wait(); - registerPublishCompletedPromise.get_future().wait(); - registerAcceptedCompletedPromise.get_future().wait(); - registerRejectedCompletedPromise.get_future().wait(); + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + auto onRegisterRejected = [&](ErrorResponse *error, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) + { + fprintf( + stdout, + "RegisterThing failed with statusCode %d, errorMessage %s and errorCode %s.", + *error->StatusCode, + error->ErrorMessage->c_str(), + error->ErrorCode->c_str()); } else { - // CreateCertificateFromCsr workflow - std::cout << "Subscribing to CreateCertificateFromCsr Accepted and Rejected topics" << std::endl; - CreateCertificateFromCsrSubscriptionRequest csrSubscriptionRequest; - identityClient.SubscribeToCreateCertificateFromCsrAccepted( - csrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrAccepted, onCsrAcceptedSubAck); - - identityClient.SubscribeToCreateCertificateFromCsrRejected( - csrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrRejected, onCsrRejectedSubAck); - - std::cout << "Publishing to CreateCertificateFromCsr topic" << std::endl; - CreateCertificateFromCsrRequest createCertificateFromCsrRequest; - createCertificateFromCsrRequest.CertificateSigningRequest = csrFile; - identityClient.PublishCreateCertificateFromCsr( - createCertificateFromCsrRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrPublishSubAck); - - std::cout << "Subscribing to RegisterThing Accepted and Rejected topics" << std::endl; - RegisterThingSubscriptionRequest registerSubscriptionRequest; - registerSubscriptionRequest.TemplateName = cmdData.input_templateName; - - identityClient.SubscribeToRegisterThingAccepted( - registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterAccepted, onRegisterAcceptedSubAck); - - identityClient.SubscribeToRegisterThingRejected( - registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterRejected, onRegisterRejectedSubAck); - - sleep(2); - - std::cout << "Publishing to RegisterThing topic" << std::endl; - RegisterThingRequest registerThingRequest; - registerThingRequest.TemplateName = cmdData.input_templateName; - - const Aws::Crt::String jsonValue = cmdData.input_templateParameters; - Aws::Crt::JsonObject value(jsonValue); - Map pm = value.View().GetAllObjects(); - Aws::Crt::Map params = - Aws::Crt::Map(); - - for (const auto &x : pm) - { - params.emplace(x.first, x.second.AsString()); - } - - registerThingRequest.Parameters = params; - registerThingRequest.CertificateOwnershipToken = token; - - identityClient.PublishRegisterThing( - registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishSubAck); - sleep(2); - - csrPublishCompletedPromise.get_future().wait(); - csrAcceptedCompletedPromise.get_future().wait(); - csrRejectedCompletedPromise.get_future().wait(); - registerPublishCompletedPromise.get_future().wait(); - registerAcceptedCompletedPromise.get_future().wait(); - registerRejectedCompletedPromise.get_future().wait(); + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); } + }; + + auto onRegisterPublishPubAck = [&](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + + registerPublishPubAckCompletedPromise.set_value(); + }; + + // Create certificate. + CreateCertificateContext certificateContext; + if (csrFile.empty()) + { + useKeysAndCertificate(identityClient, certificateContext); + } + else + { + useCsr(identityClient, certificateContext, csrFile); } + // After certificate is obtained, it's time to register a thing. + fprintf(stdout, "Subscribing to RegisterThing Accepted and Rejected topics\n"); + RegisterThingSubscriptionRequest registerSubscriptionRequest; + registerSubscriptionRequest.TemplateName = cmdData.input_templateName; + + identityClient.SubscribeToRegisterThingAccepted( + registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterAccepted, onRegisterAcceptedSubAck); + + identityClient.SubscribeToRegisterThingRejected( + registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterRejected, onRegisterRejectedSubAck); + + // Wait for the subscriptions to the accept and reject RegisterThing topics to be established. + registerAcceptedSubAckCompletedPromise.get_future().wait(); + registerRejectedSubAckCompletedPromise.get_future().wait(); + + fprintf(stdout, "Publishing to RegisterThing topic\n"); + RegisterThingRequest registerThingRequest; + registerThingRequest.TemplateName = cmdData.input_templateName; + + const Aws::Crt::String jsonValue = cmdData.input_templateParameters; + Aws::Crt::JsonObject value(jsonValue); + Map pm = value.View().GetAllObjects(); + Aws::Crt::Map params = Aws::Crt::Map(); + + for (const auto &x : pm) + { + params.emplace(x.first, x.second.AsString()); + } + + registerThingRequest.Parameters = params; + // NOTE: In a real application creating multiple certificates you'll probably need to protect token var with + // a critical section. This sample makes only one request for a certificate, so no data race is possible. + registerThingRequest.CertificateOwnershipToken = certificateContext.token; + + identityClient.PublishRegisterThing(registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishPubAck); + registerPublishPubAckCompletedPromise.get_future().wait(); + + // Wait for registering a thing to succeed. + registerAcceptedCompletedPromise.get_future().wait(); + // Disconnect if (client->Stop()) { From a84bcdc645c1ef842633ffb4cb10ccfb5819bb57 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Thu, 11 Jul 2024 17:22:26 -0700 Subject: [PATCH 2/4] Refactor fleet provisioning samples further --- .../fleet_provisioning/main.cpp | 303 ++++++++++-------- .../mqtt5_fleet_provisioning/main.cpp | 286 +++++++++-------- 2 files changed, 331 insertions(+), 258 deletions(-) diff --git a/samples/fleet_provisioning/fleet_provisioning/main.cpp b/samples/fleet_provisioning/fleet_provisioning/main.cpp index 1dbcd0ea2..f4f5fdfd5 100644 --- a/samples/fleet_provisioning/fleet_provisioning/main.cpp +++ b/samples/fleet_provisioning/fleet_provisioning/main.cpp @@ -2,6 +2,14 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ + +/** + * A sample application demonstrating usage of AWS IoT Fleet provisioning. + * + * It's easier to follow a synchronous workflow, when events happen one after another. For that reason, this sample + * performs all actions, like connecting to a server or registering a thing, in synchronous manner. + */ + #include #include @@ -26,14 +34,23 @@ using namespace Aws::Crt; using namespace Aws::Iotidentity; -static std::string getFileData(std::string const &fileName) +static String getFileData(const String &fileName) { - std::ifstream ifs(fileName); + std::ifstream ifs(fileName.c_str()); std::string str; getline(ifs, str, (char)ifs.eof()); - return str; + return str.c_str(); } +/** + * Auxiliary structure for holding data used by MQTT connection. + */ +struct ConnectionContext +{ + std::promise connectionCompletedPromise; + std::promise connectionClosedPromise; +}; + /** * Auxiliary structure for holding data used when creating a certificate. */ @@ -46,6 +63,85 @@ struct CreateCertificateContext String token; }; +/** + * Auxiliary structure for holding data used when registering a thing. + */ +struct RegisterThingContext +{ + std::promise pubAckPromise; + std::promise acceptedSubAckPromise; + std::promise rejectedSubAckPromise; + std::promise thingCreatedPromise; +}; + +/** + * Create MQTT3 connection. + */ +std::shared_ptr createConnection(const Utils::cmdData &cmdData, ConnectionContext &ctx) +{ + /** + * In a real world application you probably don't want to enforce synchronous behavior + * but this is a sample console application, so we'll just do that with a promise. + */ + + // Invoked when a MQTT connect has completed or failed + auto onConnectionCompleted = [&ctx](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) { + if (errorCode) + { + fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode)); + ctx.connectionCompletedPromise.set_value(false); + } + else + { + fprintf(stdout, "Connection completed with return code %d\n", returnCode); + ctx.connectionCompletedPromise.set_value(true); + } + }; + + // Invoked when a disconnect has been completed + auto onDisconnect = [&ctx](Mqtt::MqttConnection & /*conn*/) { + { + fprintf(stdout, "Disconnect completed\n"); + ctx.connectionClosedPromise.set_value(); + } + }; + + // Create the MQTT builder and populate it with data from cmdData. + auto clientConfigBuilder = + Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + clientConfigBuilder.WithEndpoint(cmdData.input_endpoint); + if (cmdData.input_ca != "") + { + clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str()); + } + + // Create the MQTT connection from the MQTT builder + auto clientConfig = clientConfigBuilder.Build(); + if (!clientConfig) + { + fprintf( + stderr, + "Client Configuration initialization failed with error %s\n", + Aws::Crt::ErrorDebugString(clientConfig.LastError())); + exit(-1); + } + Aws::Iot::MqttClient client = Aws::Iot::MqttClient(); + auto connection = client.NewConnection(clientConfig); + if (!*connection) + { + fprintf( + stderr, + "MQTT Connection Creation failed with error %s\n", + Aws::Crt::ErrorDebugString(connection->LastError())); + exit(-1); + } + + connection->OnConnectionCompleted = std::move(onConnectionCompleted); + connection->OnDisconnect = std::move(onDisconnect); + + return connection; +} + /** * Keys-and-Certificate workflow. * @@ -53,7 +149,7 @@ struct CreateCertificateContext * callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of * CreateCertificateContext is used to store variables used by the callbacks. */ -void useKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateContext &ctx) +void createKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateContext &ctx) { auto onKeysPublishPubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) @@ -143,7 +239,7 @@ void useKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateC * callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of * CreateCertificateContext is used to store variables used by the callbacks. */ -void useCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, const String &csrFile) +void createCertificateFromCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, const String &csrFile) { auto onCsrPublishPubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) @@ -229,119 +325,22 @@ void useCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, co ctx.tokenReceivedPromise.get_future().wait(); } -int main(int argc, char *argv[]) +/** + * Provision an AWS IoT thing using a pre-defined template. + */ +void registerThing( + IotIdentityClient &identityClient, + RegisterThingContext &ctx, + const Utils::cmdData &cmdData, + const String &token) { - /************************ Setup ****************************/ - - // Do the global initialization for the API - ApiHandle apiHandle; - // Variables for the sample - String csrFile; - RegisterThingResponse registerThingResponse; - - /** - * cmdData is the arguments/input from the command line placed into a single struct for - * use in this sample. This handles all of the command line parsing, validating, etc. - * See the Utils/CommandLineUtils for more information. - */ - Utils::cmdData cmdData = Utils::parseSampleInputFleetProvisioning(argc, argv, &apiHandle); - - if (cmdData.input_csrPath != "") - { - csrFile = getFileData(cmdData.input_csrPath.c_str()).c_str(); - } - - /** - * In a real world application you probably don't want to enforce synchronous behavior - * but this is a sample console application, so we'll just do that with a promise. - */ - std::promise connectionCompletedPromise; - std::promise connectionClosedPromise; - - // Invoked when a MQTT connect has completed or failed - auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) { - if (errorCode) - { - fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode)); - connectionCompletedPromise.set_value(false); - } - else - { - fprintf(stdout, "Connection completed with return code %d\n", returnCode); - connectionCompletedPromise.set_value(true); - } - }; - - // Invoked when a disconnect has been completed - auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) { - { - fprintf(stdout, "Disconnect completed\n"); - connectionClosedPromise.set_value(); - } - }; - - // Create the MQTT builder and populate it with data from cmdData. - auto clientConfigBuilder = - Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str()); - clientConfigBuilder.WithEndpoint(cmdData.input_endpoint); - if (cmdData.input_ca != "") - { - clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str()); - } - - // Create the MQTT connection from the MQTT builder - auto clientConfig = clientConfigBuilder.Build(); - if (!clientConfig) - { - fprintf( - stderr, - "Client Configuration initialization failed with error %s\n", - Aws::Crt::ErrorDebugString(clientConfig.LastError())); - exit(-1); - } - Aws::Iot::MqttClient client = Aws::Iot::MqttClient(); - auto connection = client.NewConnection(clientConfig); - if (!*connection) - { - fprintf( - stderr, - "MQTT Connection Creation failed with error %s\n", - Aws::Crt::ErrorDebugString(connection->LastError())); - exit(-1); - } - - connection->OnConnectionCompleted = std::move(onConnectionCompleted); - connection->OnDisconnect = std::move(onDisconnect); - - /************************ Run the sample ****************************/ - - fprintf(stdout, "Connecting...\n"); - if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0)) - { - fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError())); - exit(-1); - } - - if (!connectionCompletedPromise.get_future().get()) - { - return -1; - } - - IotIdentityClient identityClient(connection); - - std::promise registerPublishPubAckCompletedPromise; - std::promise registerAcceptedSubAckCompletedPromise; - std::promise registerRejectedSubAckCompletedPromise; - std::promise registerAcceptedCompletedPromise; - auto onRegisterAcceptedSubAck = [&](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr)); exit(-1); } - - registerAcceptedSubAckCompletedPromise.set_value(); + ctx.acceptedSubAckPromise.set_value(); }; auto onRegisterRejectedSubAck = [&](int ioErr) { @@ -350,14 +349,14 @@ int main(int argc, char *argv[]) fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr)); exit(-1); } - registerRejectedSubAckCompletedPromise.set_value(); + ctx.rejectedSubAckPromise.set_value(); }; auto onRegisterAccepted = [&](RegisterThingResponse *response, int ioErr) { if (ioErr == AWS_OP_SUCCESS) { fprintf(stdout, "RegisterThingResponse ThingName: %s.\n", response->ThingName->c_str()); - registerAcceptedCompletedPromise.set_value(); + ctx.thingCreatedPromise.set_value(); } else { @@ -389,22 +388,9 @@ int main(int argc, char *argv[]) fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); exit(-1); } - - registerPublishPubAckCompletedPromise.set_value(); + ctx.pubAckPromise.set_value(); }; - // Create certificate. - CreateCertificateContext certificateContext; - if (csrFile.empty()) - { - useKeysAndCertificate(identityClient, certificateContext); - } - else - { - useCsr(identityClient, certificateContext, csrFile); - } - - // After certificate is obtained, it's time to register a thing. fprintf(stdout, "Subscribing to RegisterThing Accepted and Rejected topics\n"); RegisterThingSubscriptionRequest registerSubscriptionRequest; registerSubscriptionRequest.TemplateName = cmdData.input_templateName; @@ -416,8 +402,8 @@ int main(int argc, char *argv[]) registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterRejected, onRegisterRejectedSubAck); // Wait for the subscriptions to the accept and reject RegisterThing topics to be established. - registerAcceptedSubAckCompletedPromise.get_future().wait(); - registerRejectedSubAckCompletedPromise.get_future().wait(); + ctx.acceptedSubAckPromise.get_future().wait(); + ctx.rejectedSubAckPromise.get_future().wait(); fprintf(stdout, "Publishing to RegisterThing topic\n"); RegisterThingRequest registerThingRequest; @@ -436,18 +422,69 @@ int main(int argc, char *argv[]) registerThingRequest.Parameters = params; // NOTE: In a real application creating multiple certificates you'll probably need to protect token var with // a critical section. This sample makes only one request for a certificate, so no data race is possible. - registerThingRequest.CertificateOwnershipToken = certificateContext.token; + registerThingRequest.CertificateOwnershipToken = token; identityClient.PublishRegisterThing(registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishPubAck); - registerPublishPubAckCompletedPromise.get_future().wait(); + ctx.pubAckPromise.get_future().wait(); // Wait for registering a thing to succeed. - registerAcceptedCompletedPromise.get_future().wait(); + ctx.thingCreatedPromise.get_future().wait(); +} + +int main(int argc, char *argv[]) +{ + /************************ Setup ****************************/ + + // Do the global initialization for the API + ApiHandle apiHandle; + + /** + * cmdData is the arguments/input from the command line placed into a single struct for + * use in this sample. This handles all of the command line parsing, validating, etc. + * See the Utils/CommandLineUtils for more information. + */ + Utils::cmdData cmdData = Utils::parseSampleInputFleetProvisioning(argc, argv, &apiHandle); + + ConnectionContext connectionContext; + auto connection = createConnection(cmdData, connectionContext); + + /************************ Run the sample ****************************/ + + fprintf(stdout, "Connecting...\n"); + if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0)) + { + fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError())); + exit(-1); + } + + if (!connectionContext.connectionCompletedPromise.get_future().get()) + { + exit(-1); + } + + // Create fleet provisioning client. + IotIdentityClient identityClient(connection); + + // Create certificate. + CreateCertificateContext certificateContext; + if (cmdData.input_csrPath != "") + { + auto csrFile = getFileData(cmdData.input_csrPath); + createCertificateFromCsr(identityClient, certificateContext, csrFile); + } + else + { + createKeysAndCertificate(identityClient, certificateContext); + } + + // After certificate is obtained, it's time to register a thing. + RegisterThingContext registerThingContext; + registerThing(identityClient, registerThingContext, cmdData, certificateContext.token); // Disconnect if (connection->Disconnect()) { - connectionClosedPromise.get_future().wait(); + connectionContext.connectionClosedPromise.get_future().wait(); } return 0; diff --git a/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp b/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp index 21ed958a8..c84b8b3b9 100644 --- a/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp +++ b/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp @@ -2,6 +2,14 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ + +/** + * A sample application demonstrating usage of AWS IoT Fleet provisioning with MQTT5 client. + * + * It's easier to follow a synchronous workflow, when events happen one after another. For that reason, this sample + * performs all actions, like connecting to a server or registering a thing, in synchronous manner. + */ + #include #include #include @@ -27,14 +35,25 @@ using namespace Aws::Crt; using namespace Aws::Iotidentity; -static std::string getFileData(std::string const &fileName) +static String getFileData(const String &fileName) { - std::ifstream ifs(fileName); + std::ifstream ifs(fileName.c_str()); std::string str; getline(ifs, str, (char)ifs.eof()); - return str; + return str.c_str(); } +/** + * Auxiliary structure for holding data used by MQTT connection. + */ +struct Mqtt5ClientContext +{ + std::promise connectionPromise; + std::promise stoppedPromise; + std::promise disconnectPromise; + std::promise subscribeSuccess; +}; + /** * Auxiliary structure for holding data used when creating a certificate. */ @@ -47,6 +66,74 @@ struct CreateCertificateContext String token; }; +/** + * Auxiliary structure for holding data used when registering a thing. + */ +struct RegisterThingContext +{ + std::promise pubAckPromise; + std::promise acceptedSubAckPromise; + std::promise rejectedSubAckPromise; + std::promise thingCreatedPromise; +}; + +/** + * Create MQTT5 client. + */ +std::shared_ptr createMqtt5Client(Mqtt5ClientContext &ctx, const Utils::cmdData &cmdData) +{ + // Create the MQTT5 builder and populate it with data from cmdData. + Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + + // Check if the builder setup correctly. + if (builder == nullptr) + { + printf( + "Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError())); + exit(-1); + } + + // Setup connection options + std::shared_ptr connectOptions = std::make_shared(); + connectOptions->WithClientId(cmdData.input_clientId); + builder->WithConnectOptions(connectOptions); + if (cmdData.input_port != 0) + { + builder->WithPort(static_cast(cmdData.input_port)); + } + + // Setup lifecycle callbacks + builder->WithClientConnectionSuccessCallback([&ctx](const Mqtt5::OnConnectionSuccessEventData &eventData) { + fprintf( + stdout, + "Mqtt5 Client connection succeed, clientid: %s.\n", + eventData.negotiatedSettings->getClientId().c_str()); + ctx.connectionPromise.set_value(true); + }); + builder->WithClientConnectionFailureCallback([&ctx](const Mqtt5::OnConnectionFailureEventData &eventData) { + fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); + ctx.connectionPromise.set_value(false); + }); + builder->WithClientStoppedCallback([&ctx](const Mqtt5::OnStoppedEventData &) { + fprintf(stdout, "Mqtt5 Client stopped.\n"); + ctx.stoppedPromise.set_value(); + }); + builder->WithClientAttemptingConnectCallback([](const Mqtt5::OnAttemptingConnectEventData &) { + fprintf(stdout, "Mqtt5 Client attempting connection...\n"); + }); + builder->WithClientDisconnectionCallback([&ctx](const Mqtt5::OnDisconnectionEventData &eventData) { + fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode)); + ctx.disconnectPromise.set_value(); + }); + + // Create Mqtt5Client + std::shared_ptr client = builder->Build(); + delete builder; + + return client; +} + /** * Keys-and-Certificate workflow. * @@ -54,7 +141,7 @@ struct CreateCertificateContext * callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of * CreateCertificateContext is used to store variables used by the callbacks. */ -void useKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateContext &ctx) +void createKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateContext &ctx) { auto onKeysPublishPubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) @@ -144,7 +231,7 @@ void useKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateC * callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of * CreateCertificateContext is used to store variables used by the callbacks. */ -void useCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, const String &csrFile) +void createCertificateFromCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, const String &csrFile) { auto onCsrPublishPubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) @@ -230,111 +317,22 @@ void useCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, co ctx.tokenReceivedPromise.get_future().wait(); } -int main(int argc, char *argv[]) +/** + * Provision an AWS IoT thing using a pre-defined template. + */ +void registerThing( + IotIdentityClient &identityClient, + RegisterThingContext &ctx, + const Utils::cmdData &cmdData, + const String &token) { - /************************ Setup ****************************/ - - // Do the global initialization for the API - ApiHandle apiHandle; - // Variables for the sample - String csrFile; - RegisterThingResponse registerThingResponse; - - /** - * cmdData is the arguments/input from the command line placed into a single struct for - * use in this sample. This handles all of the command line parsing, validating, etc. - * See the Utils/CommandLineUtils for more information. - */ - Utils::cmdData cmdData = Utils::parseSampleInputFleetProvisioning(argc, argv, &apiHandle); - - if (cmdData.input_csrPath != "") - { - csrFile = getFileData(cmdData.input_csrPath.c_str()).c_str(); - } - - // Create the MQTT5 builder and populate it with data from cmdData. - Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( - cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str()); - - // Check if the builder setup correctly. - if (builder == nullptr) - { - printf( - "Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError())); - return -1; - } - - // Setup connection options - std::shared_ptr connectOptions = std::make_shared(); - connectOptions->WithClientId(cmdData.input_clientId); - builder->WithConnectOptions(connectOptions); - if (cmdData.input_port != 0) - { - builder->WithPort(static_cast(cmdData.input_port)); - } - - std::promise connectionPromise; - std::promise stoppedPromise; - std::promise disconnectPromise; - std::promise subscribeSuccess; - - // Setup lifecycle callbacks - builder->WithClientConnectionSuccessCallback( - [&connectionPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) { - fprintf( - stdout, - "Mqtt5 Client connection succeed, clientid: %s.\n", - eventData.negotiatedSettings->getClientId().c_str()); - connectionPromise.set_value(true); - }); - builder->WithClientConnectionFailureCallback([&connectionPromise]( - const Mqtt5::OnConnectionFailureEventData &eventData) { - fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); - connectionPromise.set_value(false); - }); - builder->WithClientStoppedCallback([&stoppedPromise](const Mqtt5::OnStoppedEventData &) { - fprintf(stdout, "Mqtt5 Client stopped.\n"); - stoppedPromise.set_value(); - }); - builder->WithClientAttemptingConnectCallback([](const Mqtt5::OnAttemptingConnectEventData &) { - fprintf(stdout, "Mqtt5 Client attempting connection...\n"); - }); - builder->WithClientDisconnectionCallback([&disconnectPromise](const Mqtt5::OnDisconnectionEventData &eventData) { - fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode)); - disconnectPromise.set_value(); - }); - - // Create Mqtt5Client - std::shared_ptr client = builder->Build(); - delete builder; - /************************ Run the sample ****************************/ - - fprintf(stdout, "Connecting...\n"); - if (!client->Start()) - { - fprintf(stderr, "MQTT5 Connection failed to start"); - exit(-1); - } - - if (!connectionPromise.get_future().get()) - { - return -1; - } - IotIdentityClient identityClient(client); - - std::promise registerPublishPubAckCompletedPromise; - std::promise registerAcceptedSubAckCompletedPromise; - std::promise registerRejectedSubAckCompletedPromise; - std::promise registerAcceptedCompletedPromise; - auto onRegisterAcceptedSubAck = [&](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr)); exit(-1); } - - registerAcceptedSubAckCompletedPromise.set_value(); + ctx.acceptedSubAckPromise.set_value(); }; auto onRegisterRejectedSubAck = [&](int ioErr) { @@ -343,14 +341,14 @@ int main(int argc, char *argv[]) fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr)); exit(-1); } - registerRejectedSubAckCompletedPromise.set_value(); + ctx.rejectedSubAckPromise.set_value(); }; auto onRegisterAccepted = [&](RegisterThingResponse *response, int ioErr) { if (ioErr == AWS_OP_SUCCESS) { fprintf(stdout, "RegisterThingResponse ThingName: %s.\n", response->ThingName->c_str()); - registerAcceptedCompletedPromise.set_value(); + ctx.thingCreatedPromise.set_value(); } else { @@ -382,22 +380,9 @@ int main(int argc, char *argv[]) fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); exit(-1); } - - registerPublishPubAckCompletedPromise.set_value(); + ctx.pubAckPromise.set_value(); }; - // Create certificate. - CreateCertificateContext certificateContext; - if (csrFile.empty()) - { - useKeysAndCertificate(identityClient, certificateContext); - } - else - { - useCsr(identityClient, certificateContext, csrFile); - } - - // After certificate is obtained, it's time to register a thing. fprintf(stdout, "Subscribing to RegisterThing Accepted and Rejected topics\n"); RegisterThingSubscriptionRequest registerSubscriptionRequest; registerSubscriptionRequest.TemplateName = cmdData.input_templateName; @@ -409,8 +394,8 @@ int main(int argc, char *argv[]) registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterRejected, onRegisterRejectedSubAck); // Wait for the subscriptions to the accept and reject RegisterThing topics to be established. - registerAcceptedSubAckCompletedPromise.get_future().wait(); - registerRejectedSubAckCompletedPromise.get_future().wait(); + ctx.acceptedSubAckPromise.get_future().wait(); + ctx.rejectedSubAckPromise.get_future().wait(); fprintf(stdout, "Publishing to RegisterThing topic\n"); RegisterThingRequest registerThingRequest; @@ -429,18 +414,69 @@ int main(int argc, char *argv[]) registerThingRequest.Parameters = params; // NOTE: In a real application creating multiple certificates you'll probably need to protect token var with // a critical section. This sample makes only one request for a certificate, so no data race is possible. - registerThingRequest.CertificateOwnershipToken = certificateContext.token; + registerThingRequest.CertificateOwnershipToken = token; identityClient.PublishRegisterThing(registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishPubAck); - registerPublishPubAckCompletedPromise.get_future().wait(); + ctx.pubAckPromise.get_future().wait(); // Wait for registering a thing to succeed. - registerAcceptedCompletedPromise.get_future().wait(); + ctx.thingCreatedPromise.get_future().wait(); +} + +int main(int argc, char *argv[]) +{ + /************************ Setup ****************************/ + + // Do the global initialization for the API + ApiHandle apiHandle; + + /** + * cmdData is the arguments/input from the command line placed into a single struct for + * use in this sample. This handles all of the command line parsing, validating, etc. + * See the Utils/CommandLineUtils for more information. + */ + Utils::cmdData cmdData = Utils::parseSampleInputFleetProvisioning(argc, argv, &apiHandle); + + Mqtt5ClientContext mqtt5ClientContext; + auto client = createMqtt5Client(mqtt5ClientContext, cmdData); + + /************************ Run the sample ****************************/ + + fprintf(stdout, "Connecting...\n"); + if (!client->Start()) + { + fprintf(stderr, "MQTT5 Connection failed to start"); + exit(-1); + } + + if (!mqtt5ClientContext.connectionPromise.get_future().get()) + { + return -1; + } + + // Create fleet provisioning client. + IotIdentityClient identityClient(client); + + // Create certificate. + CreateCertificateContext certificateContext; + if (cmdData.input_csrPath != "") + { + auto csrFile = getFileData(cmdData.input_csrPath); + createCertificateFromCsr(identityClient, certificateContext, csrFile); + } + else + { + createKeysAndCertificate(identityClient, certificateContext); + } + + // After certificate is obtained, it's time to register a thing. + RegisterThingContext registerThingContext; + registerThing(identityClient, registerThingContext, cmdData, certificateContext.token); // Disconnect if (client->Stop()) { - stoppedPromise.get_future().wait(); + mqtt5ClientContext.stoppedPromise.get_future().wait(); } return 0; From 3aee062d86d70203988dd3ac207b94f11b31af75 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Fri, 12 Jul 2024 09:39:34 -0700 Subject: [PATCH 3/4] Use promise for cert token --- .../fleet_provisioning/main.cpp | 20 +++++++------------ .../mqtt5_fleet_provisioning/main.cpp | 20 +++++++------------ 2 files changed, 14 insertions(+), 26 deletions(-) diff --git a/samples/fleet_provisioning/fleet_provisioning/main.cpp b/samples/fleet_provisioning/fleet_provisioning/main.cpp index f4f5fdfd5..105577ecf 100644 --- a/samples/fleet_provisioning/fleet_provisioning/main.cpp +++ b/samples/fleet_provisioning/fleet_provisioning/main.cpp @@ -59,8 +59,7 @@ struct CreateCertificateContext std::promise pubAckPromise; std::promise acceptedSubAckPromise; std::promise rejectedSubAckPromise; - std::promise tokenReceivedPromise; - String token; + std::promise tokenPromise; }; /** @@ -182,8 +181,7 @@ void createKeysAndCertificate(IotIdentityClient &identityClient, CreateCertifica if (ioErr == AWS_OP_SUCCESS) { fprintf(stdout, "CreateKeysAndCertificateResponse certificateId: %s.\n", response->CertificateId->c_str()); - ctx.token = *response->CertificateOwnershipToken; - ctx.tokenReceivedPromise.set_value(); + ctx.tokenPromise.set_value(*response->CertificateOwnershipToken); } else { @@ -227,9 +225,6 @@ void createKeysAndCertificate(IotIdentityClient &identityClient, CreateCertifica identityClient.PublishCreateKeysAndCertificate( createKeysAndCertificateRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysPublishPubAck); ctx.pubAckPromise.get_future().wait(); - - // Wait for a certificate token. - ctx.tokenReceivedPromise.get_future().wait(); } /** @@ -272,8 +267,7 @@ void createCertificateFromCsr(IotIdentityClient &identityClient, CreateCertifica if (ioErr == AWS_OP_SUCCESS) { fprintf(stdout, "CreateCertificateFromCsrResponse certificateId: %s.\n", response->CertificateId->c_str()); - ctx.token = *response->CertificateOwnershipToken; - ctx.tokenReceivedPromise.set_value(); + ctx.tokenPromise.set_value(*response->CertificateOwnershipToken); } else { @@ -320,9 +314,6 @@ void createCertificateFromCsr(IotIdentityClient &identityClient, CreateCertifica identityClient.PublishCreateCertificateFromCsr( createCertificateFromCsrRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrPublishPubAck); ctx.pubAckPromise.get_future().wait(); - - // Wait for a certificate token. - ctx.tokenReceivedPromise.get_future().wait(); } /** @@ -477,9 +468,12 @@ int main(int argc, char *argv[]) createKeysAndCertificate(identityClient, certificateContext); } + // Wait for a certificate token to be obtained. + auto token = certificateContext.tokenPromise.get_future().get(); + // After certificate is obtained, it's time to register a thing. RegisterThingContext registerThingContext; - registerThing(identityClient, registerThingContext, cmdData, certificateContext.token); + registerThing(identityClient, registerThingContext, cmdData, token); // Disconnect if (connection->Disconnect()) diff --git a/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp b/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp index c84b8b3b9..0670619c2 100644 --- a/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp +++ b/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp @@ -62,8 +62,7 @@ struct CreateCertificateContext std::promise pubAckPromise; std::promise acceptedSubAckPromise; std::promise rejectedSubAckPromise; - std::promise tokenReceivedPromise; - String token; + std::promise tokenPromise; }; /** @@ -174,8 +173,7 @@ void createKeysAndCertificate(IotIdentityClient &identityClient, CreateCertifica if (ioErr == AWS_OP_SUCCESS) { fprintf(stdout, "CreateKeysAndCertificateResponse certificateId: %s.\n", response->CertificateId->c_str()); - ctx.token = *response->CertificateOwnershipToken; - ctx.tokenReceivedPromise.set_value(); + ctx.tokenPromise.set_value(*response->CertificateOwnershipToken); } else { @@ -219,9 +217,6 @@ void createKeysAndCertificate(IotIdentityClient &identityClient, CreateCertifica identityClient.PublishCreateKeysAndCertificate( createKeysAndCertificateRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysPublishPubAck); ctx.pubAckPromise.get_future().wait(); - - // Wait for a certificate token. - ctx.tokenReceivedPromise.get_future().wait(); } /** @@ -264,8 +259,7 @@ void createCertificateFromCsr(IotIdentityClient &identityClient, CreateCertifica if (ioErr == AWS_OP_SUCCESS) { fprintf(stdout, "CreateCertificateFromCsrResponse certificateId: %s.\n", response->CertificateId->c_str()); - ctx.token = *response->CertificateOwnershipToken; - ctx.tokenReceivedPromise.set_value(); + ctx.tokenPromise.set_value(*response->CertificateOwnershipToken); } else { @@ -312,9 +306,6 @@ void createCertificateFromCsr(IotIdentityClient &identityClient, CreateCertifica identityClient.PublishCreateCertificateFromCsr( createCertificateFromCsrRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrPublishPubAck); ctx.pubAckPromise.get_future().wait(); - - // Wait for a certificate token. - ctx.tokenReceivedPromise.get_future().wait(); } /** @@ -469,9 +460,12 @@ int main(int argc, char *argv[]) createKeysAndCertificate(identityClient, certificateContext); } + // Wait for a certificate token to be obtained. + auto token = certificateContext.tokenPromise.get_future().get(); + // After certificate is obtained, it's time to register a thing. RegisterThingContext registerThingContext; - registerThing(identityClient, registerThingContext, cmdData, certificateContext.token); + registerThing(identityClient, registerThingContext, cmdData, token); // Disconnect if (client->Stop()) From 6a2a74deeba1c6f785f32ca72363307ec3f424d5 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Fri, 12 Jul 2024 10:04:01 -0700 Subject: [PATCH 4/4] Fix lambda captures --- .../fleet_provisioning/fleet_provisioning/main.cpp | 14 +++++++------- .../mqtt5_fleet_provisioning/main.cpp | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/samples/fleet_provisioning/fleet_provisioning/main.cpp b/samples/fleet_provisioning/fleet_provisioning/main.cpp index 105577ecf..02b307f7e 100644 --- a/samples/fleet_provisioning/fleet_provisioning/main.cpp +++ b/samples/fleet_provisioning/fleet_provisioning/main.cpp @@ -190,7 +190,7 @@ void createKeysAndCertificate(IotIdentityClient &identityClient, CreateCertifica } }; - auto onKeysRejected = [&](ErrorResponse *error, int ioErr) { + auto onKeysRejected = [](ErrorResponse *error, int ioErr) { if (ioErr == AWS_OP_SUCCESS) { fprintf( @@ -276,7 +276,7 @@ void createCertificateFromCsr(IotIdentityClient &identityClient, CreateCertifica } }; - auto onCsrRejected = [&](ErrorResponse *error, int ioErr) { + auto onCsrRejected = [](ErrorResponse *error, int ioErr) { if (ioErr == AWS_OP_SUCCESS) { fprintf( @@ -325,7 +325,7 @@ void registerThing( const Utils::cmdData &cmdData, const String &token) { - auto onRegisterAcceptedSubAck = [&](int ioErr) { + auto onRegisterAcceptedSubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr)); @@ -334,7 +334,7 @@ void registerThing( ctx.acceptedSubAckPromise.set_value(); }; - auto onRegisterRejectedSubAck = [&](int ioErr) { + auto onRegisterRejectedSubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr)); @@ -343,7 +343,7 @@ void registerThing( ctx.rejectedSubAckPromise.set_value(); }; - auto onRegisterAccepted = [&](RegisterThingResponse *response, int ioErr) { + auto onRegisterAccepted = [&ctx](RegisterThingResponse *response, int ioErr) { if (ioErr == AWS_OP_SUCCESS) { fprintf(stdout, "RegisterThingResponse ThingName: %s.\n", response->ThingName->c_str()); @@ -356,7 +356,7 @@ void registerThing( } }; - auto onRegisterRejected = [&](ErrorResponse *error, int ioErr) { + auto onRegisterRejected = [](ErrorResponse *error, int ioErr) { if (ioErr == AWS_OP_SUCCESS) { fprintf( @@ -373,7 +373,7 @@ void registerThing( } }; - auto onRegisterPublishPubAck = [&](int ioErr) { + auto onRegisterPublishPubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); diff --git a/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp b/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp index 0670619c2..3ea7ef274 100644 --- a/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp +++ b/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp @@ -182,7 +182,7 @@ void createKeysAndCertificate(IotIdentityClient &identityClient, CreateCertifica } }; - auto onKeysRejected = [&](ErrorResponse *error, int ioErr) { + auto onKeysRejected = [](ErrorResponse *error, int ioErr) { if (ioErr == AWS_OP_SUCCESS) { fprintf( @@ -268,7 +268,7 @@ void createCertificateFromCsr(IotIdentityClient &identityClient, CreateCertifica } }; - auto onCsrRejected = [&](ErrorResponse *error, int ioErr) { + auto onCsrRejected = [](ErrorResponse *error, int ioErr) { if (ioErr == AWS_OP_SUCCESS) { fprintf( @@ -317,7 +317,7 @@ void registerThing( const Utils::cmdData &cmdData, const String &token) { - auto onRegisterAcceptedSubAck = [&](int ioErr) { + auto onRegisterAcceptedSubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr)); @@ -326,7 +326,7 @@ void registerThing( ctx.acceptedSubAckPromise.set_value(); }; - auto onRegisterRejectedSubAck = [&](int ioErr) { + auto onRegisterRejectedSubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr)); @@ -335,7 +335,7 @@ void registerThing( ctx.rejectedSubAckPromise.set_value(); }; - auto onRegisterAccepted = [&](RegisterThingResponse *response, int ioErr) { + auto onRegisterAccepted = [&ctx](RegisterThingResponse *response, int ioErr) { if (ioErr == AWS_OP_SUCCESS) { fprintf(stdout, "RegisterThingResponse ThingName: %s.\n", response->ThingName->c_str()); @@ -348,7 +348,7 @@ void registerThing( } }; - auto onRegisterRejected = [&](ErrorResponse *error, int ioErr) { + auto onRegisterRejected = [](ErrorResponse *error, int ioErr) { if (ioErr == AWS_OP_SUCCESS) { fprintf( @@ -365,7 +365,7 @@ void registerThing( } }; - auto onRegisterPublishPubAck = [&](int ioErr) { + auto onRegisterPublishPubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr));