diff --git a/.builder/actions/build_samples.py b/.builder/actions/build_samples.py index 34421a30f..b56b51209 100644 --- a/.builder/actions/build_samples.py +++ b/.builder/actions/build_samples.py @@ -27,6 +27,7 @@ def run(self, env): 'samples/mqtt/custom_authorizer_connect', 'samples/mqtt/cognito_connect', 'samples/mqtt5/mqtt5_pubsub', + 'samples/mqtt5/mqtt5_shared_subscription', "samples/pub_sub/basic_pub_sub", "samples/pub_sub/cycle_pub_sub", 'samples/secure_tunneling/secure_tunnel', diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 387dfd70b..f3a0a4f4d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -451,6 +451,9 @@ jobs: - name: run MQTT5 PubSub sample run: | python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_mqtt5_pubsub_cfg.json + - name: run MQTT5 Shared Subscription sample + run: | + python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_mqtt5_shared_subscription_cfg.json - name: configure AWS credentials (Cognito) uses: aws-actions/configure-aws-credentials@v1 with: diff --git a/.github/workflows/ci_run_mqtt5_shared_subscription_cfg.json b/.github/workflows/ci_run_mqtt5_shared_subscription_cfg.json new file mode 100644 index 000000000..d64afd9f1 --- /dev/null +++ b/.github/workflows/ci_run_mqtt5_shared_subscription_cfg.json @@ -0,0 +1,26 @@ +{ + "language": "CPP", + "sample_file": "./aws-iot-device-sdk-cpp-v2/build/samples/mqtt5/mqtt5_shared_subscription/mqtt5_shared_subscription", + "sample_region": "us-east-1", + "sample_main_class": "", + "arguments": [ + { + "name": "--endpoint", + "secret": "ci/endpoint" + }, + { + "name": "--cert", + "secret": "ci/mqtt5/us/mqtt5_thing/cert", + "filename": "tmp_certificate.pem" + }, + { + "name": "--key", + "secret": "ci/mqtt5/us/mqtt5_thing/key", + "filename": "tmp_key.pem" + }, + { + "name": "--is_ci", + "data": "true" + } + ] +} diff --git a/samples/CMakeLists.txt b/samples/CMakeLists.txt index cdebb9036..7c1f9a91d 100644 --- a/samples/CMakeLists.txt +++ b/samples/CMakeLists.txt @@ -16,6 +16,7 @@ add_subdirectory(mqtt/windows_cert_connect) add_subdirectory(mqtt/x509_credentials_provider_connect) add_subdirectory(mqtt/cognito_connect) add_subdirectory(mqtt5/mqtt5_pubsub) +add_subdirectory(mqtt5/mqtt5_shared_subscription) add_subdirectory(pub_sub/basic_pub_sub) add_subdirectory(pub_sub/cycle_pub_sub) add_subdirectory(secure_tunneling/secure_tunnel) diff --git a/samples/README.md b/samples/README.md index 7e85d7867..bfcc58ac0 100644 --- a/samples/README.md +++ b/samples/README.md @@ -3,6 +3,7 @@ * [Basic Pub-Sub](./pub_sub/basic_pub_sub/README.md) * [Basic Connect](./mqtt/basic_connect/README.md) * [Mqtt5 Pub-Sub](./mqtt5/mqtt5_pubsub/README.md) +* [Mqtt5 Shared Subscription](./mqtt5/mqtt5_shared_subscription/README.md) * [Websocket Connect](./mqtt/websocket_connect/README.md) * [PKCS#11 Connect](./mqtt/pkcs11_connect/README.md) * [Raw Connect](./mqtt/raw_connect/README.md) diff --git a/samples/mqtt5/mqtt5_shared_subscription/CMakeLists.txt b/samples/mqtt5/mqtt5_shared_subscription/CMakeLists.txt new file mode 100644 index 000000000..b366a1618 --- /dev/null +++ b/samples/mqtt5/mqtt5_shared_subscription/CMakeLists.txt @@ -0,0 +1,25 @@ +cmake_minimum_required(VERSION 3.1) +# note: cxx-17 requires cmake 3.8, cxx-20 requires cmake 3.12 +project(mqtt5_shared_subscription CXX) + +file(GLOB SRC_FILES + "*.cpp" + "../../utils/CommandLineUtils.cpp" + "../../utils/CommandLineUtils.h" +) + +add_executable(${PROJECT_NAME} ${SRC_FILES}) + +set_target_properties(${PROJECT_NAME} PROPERTIES + CXX_STANDARD 14) + +#set warnings +if (MSVC) + target_compile_options(${PROJECT_NAME} PRIVATE /W4 /WX /wd4068) +else () + target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wno-long-long -pedantic -Werror) +endif () + +find_package(aws-crt-cpp REQUIRED) + +target_link_libraries(${PROJECT_NAME} AWS::aws-crt-cpp) diff --git a/samples/mqtt5/mqtt5_shared_subscription/README.md b/samples/mqtt5/mqtt5_shared_subscription/README.md new file mode 100644 index 000000000..19fdd2e18 --- /dev/null +++ b/samples/mqtt5/mqtt5_shared_subscription/README.md @@ -0,0 +1,96 @@ +# MQTT5 Shared Subscription + +[**Return to main sample list**](../../README.md) + +This sample uses the +[Message Broker](https://docs.aws.amazon.com/iot/latest/developerguide/iot-message-broker.html) +for AWS IoT to send and receive messages through an MQTT connection using MQTT5 using a Shared Subscription. + +MQTT5 introduces additional features and enhancements that improve the development experience with MQTT. You can read more about MQTT5 in the C++ V2 SDK by checking out the [MQTT5 user guide](../../../documents/MQTT5_Userguide.md). + +Note: MQTT5 support is currently in **developer preview**. We encourage feedback at all times, but feedback during the preview window is especially valuable in shaping the final product. During the preview period we may make backwards-incompatible changes to the public API, but in general, this is something we will try our best to avoid. + +Shared Subscriptions allow IoT devices to connect to a group where messages sent to a topic are then relayed to the group in a round-robin-like fashion. This is useful for distributing message load across multiple subscribing MQTT5 clients automatically. This is helpful for load balancing when you have many messages that need to processed. + +Shared Subscriptions rely on what is called a group identifier, which tells the MQTT5 broker/server which IoT devices are in what group. This is done when subscribing by formatting the subscription topic like the following: `$share//`. +* `$share`: Tells the MQTT5 broker/server that the device is subscribing to a Shared Subscription. +* ``: Tells the MQTT5 broker/server which group to add this Shared Subscription to. THis is the group of MQTT5 clients that will be worked through as part of the round-robin when a message comes in. For example: `my-iot-group`. +* ``: The topic that the Shared Subscription is for. Messages published to this topic will be processed in a round-robin fashion. For example, `test/topic`. + +As mentioned, Shared Subscriptions use a round-robbin like method of distributing messages. For example, say you have three MQTT5 clients all subscribed to the same Shared Subscription group and topic. If five messages are sent to the Shared Subscription topic, the messages will likely be delivered in the following order: +* Message 1 -> Client one +* Message 2 -> Client two +* Message 3 -> Client three +* Message 4 -> Client one +* Message 5 -> Client two +* etc... + +Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect, subscribe, publish, and receive. Below is a sample policy that can be used on your IoT Core Thing that will allow this sample to run as intended. + +
+(see sample policy) +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Publish",
+        "iot:Receive"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topic/test/topic",
+        "arn:aws:iot:region:account:topic/$share/*/test/topic"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Subscribe"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topicfilter/test/topic",
+        "arn:aws:iot:region:account:topicfilter/$share/*/test/topic"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Connect"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:client/test-*"
+      ]
+    }
+  ]
+}
+
+ +Replace with the following with the data from your AWS account: +* ``: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`. +* ``: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website. + +Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports. + +
+ +## How to run + +To Run this sample using a direct MQTT connection with a key and certificate, use the following command: + +``` sh +./mqtt5_shared_subscription --endpoint --cert --key +``` + +You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it: + +``` sh +./mqtt5_shared_subscription --endpoint --cert --key --ca_file +``` + +Finally, you can also set the Shared Subscription group identifier and topic with `--group_identifier` and `--topic` respectively: + +``` sh +./mqtt5_shared_subscription --endpoint --cert --key --group_identifier --topic +``` diff --git a/samples/mqtt5/mqtt5_shared_subscription/main.cpp b/samples/mqtt5/mqtt5_shared_subscription/main.cpp new file mode 100644 index 000000000..8b648ed49 --- /dev/null +++ b/samples/mqtt5/mqtt5_shared_subscription/main.cpp @@ -0,0 +1,513 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include +#include +#include +#include + +#include + +#include "../../utils/CommandLineUtils.h" + +using namespace Aws::Crt; + +/** + * MQTT5 support is currently in developer preview. We encourage feedback at all times, but feedback during the + * preview window is especially valuable in shaping the final product. During the preview period we may make + * backwards-incompatible changes to the public API, but in general, this is something we will try our best to avoid. + */ + +/** + * For the purposes of this sample, we need to associate certain variables with a particular MQTT5 client + * and to do so we use this class to hold all the data for a particular client used in the sample. + */ +class sample_mqtt5_client +{ + public: + std::shared_ptr client; + String name; + std::promise connectionPromise; + std::promise stoppedPromise; + std::mutex receiveMutex; + std::condition_variable receiveSignal; + uint32_t receivedMessages; + uint32_t expectedMessages; + bool sharedSubscriptionSupportNotAvailable; + + /** + * A helper function to print a message and then exit the sample. + */ + void PrintMessageAndExit(String message, int exitCode) + { + fprintf(stdout, "[%s]: %s\n", this->name.c_str(), message.c_str()); + exit(exitCode); + } + + /** + * Creates a MQTT5 client using direct MQTT5 via mTLS with the passed input data. + */ + static std::shared_ptr create_mqtt5_client( + String input_endpoint, + String input_cert, + String input_key, + String input_ca, + String input_clientId, + uint32_t input_count, + String input_clientName) + { + std::shared_ptr result = std::make_shared(); + + result->receivedMessages = 0; + result->expectedMessages = input_count; + result->name = input_clientName; + result->sharedSubscriptionSupportNotAvailable = false; + + Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + input_endpoint, input_cert.c_str(), input_key.c_str()); + if (builder == nullptr) + { + return nullptr; + } + + if (input_ca != "") + { + builder->WithCertificateAuthority(input_ca.c_str()); + } + std::shared_ptr connectOptions = std::make_shared(); + connectOptions->withClientId(input_clientId); + builder->withConnectOptions(connectOptions); + + builder->withClientConnectionSuccessCallback( + [result](Mqtt5::Mqtt5Client &, const Mqtt5::OnConnectionSuccessEventData &eventData) { + fprintf( + stdout, + "[%s]: Connection succeed, clientid: %s.\n", + result->name.c_str(), + eventData.negotiatedSettings->getClientId().c_str()); + + try + { + result->connectionPromise.set_value(true); + } + catch (...) + { + fprintf( + stdout, + "[%s]: Exception ocurred trying to set connection promise (likely already set)\n", + result->name.c_str()); + } + }); + + builder->withClientConnectionFailureCallback( + [result](Mqtt5::Mqtt5Client &, const Mqtt5::OnConnectionFailureEventData &eventData) { + fprintf( + stdout, + "[%s]: Connection failed with error: %s.\n", + result->name.c_str(), + aws_error_debug_str(eventData.errorCode)); + + try + { + result->connectionPromise.set_value(false); + } + catch (...) + { + fprintf( + stdout, + "[%s]: Exception ocurred trying to set connection promise (likely already set)\n", + result->name.c_str()); + } + }); + + builder->withClientStoppedCallback([result](Mqtt5::Mqtt5Client &, const Mqtt5::OnStoppedEventData &) { + fprintf(stdout, "[%s]: Stopped.\n", result->name.c_str()); + result->stoppedPromise.set_value(); + }); + + builder->withClientDisconnectionCallback( + [result](Mqtt5::Mqtt5Client &, const Mqtt5::OnDisconnectionEventData &eventData) { + fprintf( + stdout, + "[%s]: Disconnection with reason: %s.\n", + result->name.c_str(), + aws_error_debug_str(eventData.errorCode)); + if (eventData.disconnectPacket != nullptr) + { + fprintf(stdout, "\tReason code: %u\n", (uint32_t)eventData.disconnectPacket->getReasonCode()); + if (eventData.disconnectPacket->getReasonCode() == + Mqtt5::DisconnectReasonCode::AWS_MQTT5_DRC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED) + { + result->sharedSubscriptionSupportNotAvailable = true; + } + } + }); + + builder->withPublishReceivedCallback([result]( + Mqtt5::Mqtt5Client & /*client*/, + const Mqtt5::PublishReceivedEventData &eventData) { + fprintf(stdout, "[%s]: Received a publish\n", result->name.c_str()); + + std::lock_guard lock(result->receiveMutex); + if (eventData.publishPacket != nullptr) + { + fprintf(stdout, "\tPublish received on topic: %s\n", eventData.publishPacket->getTopic().c_str()); + fprintf(stdout, "\tMessage: "); + fwrite(eventData.publishPacket->getPayload().ptr, 1, eventData.publishPacket->getPayload().len, stdout); + fprintf(stdout, "\n"); + + for (Mqtt5::UserProperty prop : eventData.publishPacket->getUserProperties()) + { + fprintf(stdout, "\t\twith UserProperty:(%s,%s)\n", prop.getName().c_str(), prop.getValue().c_str()); + } + } + result->receivedMessages += 1; + if (result->receivedMessages > result->expectedMessages) + { + result->receiveSignal.notify_all(); + } + }); + + result->client = builder->Build(); + delete builder; + return result; + } +}; + +int main(int argc, char *argv[]) +{ + /************************ Setup the Lib ****************************/ + /* + * Do the global initialization for the API. + */ + ApiHandle apiHandle; + + /*********************** Setup Arguments ***************************/ + Utils::CommandLineUtils cmdUtils = Utils::CommandLineUtils(); + cmdUtils.RegisterProgramName("mqtt5_shared_subscription"); + cmdUtils.AddCommonMQTTCommands(); + cmdUtils.RegisterCommand("key", "", "Path to your key in PEM format."); + cmdUtils.RegisterCommand("cert", "", "Path to your client certificate in PEM format."); + cmdUtils.AddCommonProxyCommands(); + cmdUtils.AddCommonTopicMessageCommands(); + cmdUtils.RegisterCommand("client_id", "", "Client id to use (optional, default='test-*')"); + cmdUtils.RegisterCommand("count", "", "The number of messages to send (optional, default='10')"); + cmdUtils.RegisterCommand( + "group_identifier", + "", + "The group identifier to use in the shared subscription (optional, default='cpp-sample')"); + cmdUtils.RegisterCommand( + "is_ci", "", "If present the sample will run in CI mode (will publish to shadow automatically)."); + cmdUtils.AddLoggingCommands(); + const char **const_argv = (const char **)argv; + cmdUtils.SendArguments(const_argv, const_argv + argc); + cmdUtils.StartLoggingBasedOnCommand(&apiHandle); + + /*********************** Pull data from arguments ***************************/ + + String input_endpoint = cmdUtils.GetCommandRequired("endpoint"); + String input_cert = cmdUtils.GetCommandRequired("cert"); + String input_key = cmdUtils.GetCommandRequired("key"); + String input_ca = cmdUtils.GetCommandOrDefault("ca_file", ""); + String input_clientId = cmdUtils.GetCommandOrDefault("client_id", String("test-") + Aws::Crt::UUID().ToString()); + uint32_t input_count = 10; + String input_topic = cmdUtils.GetCommandOrDefault("topic", "test/topic"); + String input_message = cmdUtils.GetCommandOrDefault("message", "Hello World! "); + String input_groupIdentifier = cmdUtils.GetCommandOrDefault("group_identifier", "cpp-sample"); + bool input_isCI = cmdUtils.HasCommand("is_ci"); + + if (cmdUtils.HasCommand("count")) + { + int count = atoi(cmdUtils.GetCommand("count").c_str()); + if (count > 0) + { + input_count = count; + } + } + + // If this is CI, append a UUID to the topic + if (input_isCI) + { + input_topic = input_topic + Aws::Crt::UUID().ToString(); + } + + // Construct the shared topic + String input_sharedTopic = String("$share/") + input_groupIdentifier + String("/") + input_topic; + + // Make sure the message count is even + if (input_count % 2 != 0) + { + fprintf(stdout, "'--count' is an odd number. '--count' must be even or zero for this sample."); + exit(-1); + } + + /*********************** Create the MQTT5 clients: one publisher and two subscribers ***************************/ + + std::shared_ptr publisher = sample_mqtt5_client::create_mqtt5_client( + input_endpoint, + input_cert, + input_key, + input_ca, + input_clientId + String("1"), + input_count / 2, + String("Publisher")); + std::shared_ptr subscriberOne = sample_mqtt5_client::create_mqtt5_client( + input_endpoint, + input_cert, + input_key, + input_ca, + input_clientId + String("2"), + input_count / 2, + String("Subscriber One")); + std::shared_ptr subscriberTwo = sample_mqtt5_client::create_mqtt5_client( + input_endpoint, + input_cert, + input_key, + input_ca, + input_clientId + String("3"), + input_count / 2, + String("Subscriber Two")); + + if (publisher == nullptr || subscriberOne == nullptr || subscriberTwo == nullptr) + { + fprintf(stdout, "Client creation failed.\n"); + exit(-1); + } + + /*********************** Connect the MQTT5 clients ***************************/ + + if (publisher->client->Start()) + { + if (publisher->connectionPromise.get_future().get() == false) + { + publisher->PrintMessageAndExit("Connection was unsuccessful", -1); + } + } + else + { + publisher->PrintMessageAndExit("Could not start", -1); + } + + if (subscriberOne->client->Start()) + { + if (subscriberOne->connectionPromise.get_future().get() == false) + { + subscriberOne->PrintMessageAndExit("Connection was unsuccessful", -1); + } + } + else + { + subscriberOne->PrintMessageAndExit("Could not start", -1); + } + + if (subscriberTwo->client->Start()) + { + if (subscriberTwo->connectionPromise.get_future().get() == false) + { + subscriberTwo->PrintMessageAndExit("Connection was unsuccessful", -1); + } + } + else + { + subscriberTwo->PrintMessageAndExit("Could not start", -1); + } + + /*********************** Subscribe the two subscriber MQTT5 clients ***************************/ + + std::promise subscribeSuccess; + auto onSubAck = + [&subscribeSuccess]( + std::shared_ptr, int error_code, std::shared_ptr suback) { + if (error_code != 0) + { + fprintf( + stdout, + "MQTT5 Client Subscription failed with error code: (%d)%s\n", + error_code, + aws_error_debug_str(error_code)); + subscribeSuccess.set_value(Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_UNSPECIFIED_ERROR); + } + if (suback != nullptr) + { + for (Mqtt5::SubAckReasonCode reasonCode : suback->getReasonCodes()) + { + if (reasonCode >= Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_UNSPECIFIED_ERROR) + { + fprintf( + stdout, + "MQTT5 Client Subscription failed with server error code: (%d)%s\n", + reasonCode, + suback->getReasonString()->c_str()); + subscribeSuccess.set_value(reasonCode); + return; + } + } + } + subscribeSuccess.set_value(Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_GRANTED_QOS_1); + }; + Mqtt5::Subscription sub1(input_sharedTopic, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE); + sub1.withNoLocal(false); + std::shared_ptr subPacket = std::make_shared(); + subPacket->withSubscription(std::move(sub1)); + + if (subscriberOne->client->Subscribe(subPacket, onSubAck)) + { + // Wait just a little bit to see if the client was disconnected due to missing Shared Subscription support. + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + if (subscriberOne->sharedSubscriptionSupportNotAvailable == true) + { + if (input_isCI == true) + { + // TMP: If this fails subscribing in CI, just exit the sample gracefully + subscriberOne->PrintMessageAndExit("Shared Subscriptions not supported", 0); + } + subscriberOne->PrintMessageAndExit("Shared Subscriptions not supported", -1); + } + Mqtt5::SubAckReasonCode result = subscribeSuccess.get_future().get(); + if (result >= Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_UNSPECIFIED_ERROR) + { + subscriberOne->PrintMessageAndExit("Failed to subscribe", -1); + } + } + else + { + subscriberOne->PrintMessageAndExit("Failed to subscribe", -1); + } + + subscribeSuccess = std::promise(); + if (subscriberTwo->client->Subscribe(subPacket, onSubAck)) + { + // Wait just a little bit to see if the client was disconnected due to missing Shared Subscription support. + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + if (subscriberTwo->sharedSubscriptionSupportNotAvailable == true) + { + if (input_isCI == true) + { + // TMP: If this fails subscribing in CI, just exit the sample gracefully + subscriberTwo->PrintMessageAndExit("Shared Subscriptions not supported", 0); + } + subscriberTwo->PrintMessageAndExit("Shared Subscriptions not supported", -1); + } + Mqtt5::SubAckReasonCode result = subscribeSuccess.get_future().get(); + if (result >= Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_UNSPECIFIED_ERROR) + { + subscriberTwo->PrintMessageAndExit("Failed to subscribe", -1); + } + } + else + { + subscriberTwo->PrintMessageAndExit("Failed to subscribe", -1); + } + + /*********************** Publish ***************************/ + + auto onPublishComplete = [publisher]( + std::shared_ptr client, + int, + std::shared_ptr result) { + if (!result->wasSuccessful()) + { + fprintf(stdout, "[%s] Publish failed with error_code: %d", publisher->name.c_str(), result->getErrorCode()); + } + else if (result != nullptr) + { + std::shared_ptr puback = + std::dynamic_pointer_cast(result->getAck()); + if (puback->getReasonCode() == 0) + { + fprintf(stdout, "[%s]: Publish Succeed.\n", publisher->name.c_str()); + } + else + { + fprintf( + stdout, + "[%s] Publish failed. PubACK reason code: %d : %s\n", + publisher->name.c_str(), + puback->getReasonCode(), + puback->getReasonString()->c_str()); + } + }; + }; + + uint32_t publishedCount = 0; + while (publishedCount < input_count) + { + String message = input_message + std::to_string(publishedCount + 1).c_str(); + ByteCursor payload = ByteCursorFromString(message); + std::shared_ptr publish = + std::make_shared(input_topic, payload, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE); + if (publisher->client->Publish(publish, onPublishComplete)) + { + ++publishedCount; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + /*********************** Make sure all the messages got to the subscribers ***************************/ + + std::unique_lock receivedLockOne(subscriberOne->receiveMutex); + subscriberOne->receiveSignal.wait(receivedLockOne, [&, subscriberOne] { + return subscriberOne->receivedMessages >= subscriberOne->expectedMessages; + }); + std::unique_lock receivedLockTwo(subscriberTwo->receiveMutex); + subscriberTwo->receiveSignal.wait(receivedLockTwo, [&, subscriberTwo] { + return subscriberTwo->receivedMessages >= subscriberTwo->expectedMessages; + }); + + /*********************** Unsubscribe the subscribers ***************************/ + + std::promise unsubscribeFinishedPromise; + std::shared_ptr unsub = std::make_shared(); + unsub->withTopicFilter(input_sharedTopic); + if (!subscriberOne->client->Unsubscribe( + unsub, [&](std::shared_ptr, int, std::shared_ptr) { + unsubscribeFinishedPromise.set_value(); + })) + { + subscriberOne->PrintMessageAndExit("Unsubscribe failed", -1); + } + unsubscribeFinishedPromise.get_future().wait(); + + unsubscribeFinishedPromise = std::promise(); + if (!subscriberTwo->client->Unsubscribe( + unsub, [&](std::shared_ptr, int, std::shared_ptr) { + unsubscribeFinishedPromise.set_value(); + })) + { + subscriberTwo->PrintMessageAndExit("Unsubscribe failed", -1); + } + unsubscribeFinishedPromise.get_future().wait(); + + /*********************** Disconnect all the MQTT5 clients ***************************/ + + if (!publisher->client->Stop()) + { + publisher->PrintMessageAndExit("Failed to disconnect. Exiting...", -1); + } + publisher->stoppedPromise.get_future().wait(); + + if (!subscriberOne->client->Stop()) + { + subscriberOne->PrintMessageAndExit("Failed to disconnect. Exiting...", -1); + } + subscriberOne->stoppedPromise.get_future().wait(); + + if (!subscriberTwo->client->Stop()) + { + subscriberTwo->PrintMessageAndExit("Failed to disconnect. Exiting...", -1); + } + subscriberTwo->stoppedPromise.get_future().wait(); + + /*********************** Free the shared pointers (MQTT5 clients) ***************************/ + publisher->client = nullptr; + subscriberOne->client = nullptr; + subscriberTwo->client = nullptr; + publisher = nullptr; + subscriberOne = nullptr; + subscriberTwo = nullptr; + + fprintf(stdout, "Complete!\n"); + exit(0); +}