From 6f65a7c5c6bc76de2a64908811a94aae44bfb472 Mon Sep 17 00:00:00 2001 From: Noah Beard Date: Fri, 21 Apr 2023 14:14:18 -0400 Subject: [PATCH 1/3] Adjust shared subscription sample to better fit docs --- .../mqtt5/mqtt5_shared_subscription/README.md | 10 +-- .../mqtt5/mqtt5_shared_subscription/main.cpp | 84 +++++++++---------- 2 files changed, 45 insertions(+), 49 deletions(-) diff --git a/samples/mqtt5/mqtt5_shared_subscription/README.md b/samples/mqtt5/mqtt5_shared_subscription/README.md index 787670172..65190ddb5 100644 --- a/samples/mqtt5/mqtt5_shared_subscription/README.md +++ b/samples/mqtt5/mqtt5_shared_subscription/README.md @@ -10,12 +10,12 @@ MQTT5 introduces additional features and enhancements that improve the developme Note: MQTT5 support is currently in **developer preview**. We encourage feedback at all times, but feedback during the preview window is especially valuable in shaping the final product. During the preview period we may make backwards-incompatible changes to the public API, but in general, this is something we will try our best to avoid. -[Shared Subscriptions](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250) allow IoT devices to connect to a group where messages sent to a topic are then relayed to the group in a round-robin-like fashion. This is useful for distributing message load across multiple subscribing MQTT5 clients automatically. This is helpful for load balancing when you have many messages that need to be processed. +[Shared Subscriptions](https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt5-shared-subscription) allow IoT devices to connect to a group where messages sent to a topic are then relayed to the group in a round-robin-like fashion. This is useful for distributing message load across multiple subscribing MQTT5 clients automatically. This is helpful for load balancing when you have many messages that need to be processed. -Shared Subscriptions rely on a group identifier, which tells the MQTT5 broker/server which IoT devices to treat as a group for message distribution. This is done when subscribing by formatting the subscription topic like the following: `$share//`. +Shared Subscriptions rely on a group name/identifier, which tells the MQTT5 broker/server which IoT devices to treat as a group for message distribution. This is done when subscribing by formatting the subscription topic like the following: `$share//`. * `$share`: Tells the MQTT5 broker/server that the device is subscribing to a Shared Subscription. -* ``: Tells the MQTT5 broker/server which group to add this Shared Subscription to. Messages published to a matching topic will be distributed round-robin amongst the group. -* ``: The topic that the Shared Subscription is for. Messages published to this topic will be processed in a round-robin fashion. For example, `test/topic`. +* ``: Tells the MQTT5 broker/server which group to add this Shared Subscription to. Messages published to a matching topic will be distributed round-robin amongst the group. +* ``: The topic that the Shared Subscription is for. Messages published to this topic will be processed in a round-robin fashion. For example, `test/topic`. Shared Subscriptions use a round-robbin like method of distributing messages. For example, say you have three MQTT5 clients all subscribed to the same Shared Subscription group and topic. If five messages are sent to the Shared Subscription topic, the messages will likely be delivered in the following order: * Message 1 -> Client one @@ -71,7 +71,7 @@ Replace with the following with the data from your AWS account: * ``: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`. * ``: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website. -Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports. +Note that in a real application, you may want to avoid the use of wildcards in your ClientID and shared subscription group names/identifiers. Wildcards should be used only selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports. diff --git a/samples/mqtt5/mqtt5_shared_subscription/main.cpp b/samples/mqtt5/mqtt5_shared_subscription/main.cpp index 47837adf5..96d78c643 100644 --- a/samples/mqtt5/mqtt5_shared_subscription/main.cpp +++ b/samples/mqtt5/mqtt5_shared_subscription/main.cpp @@ -25,10 +25,6 @@ class sample_mqtt5_client std::promise connectionPromise; std::promise stoppedPromise; std::mutex receiveMutex; - std::condition_variable receiveSignal; - uint64_t receivedMessages; - uint64_t expectedMessages; - bool sharedSubscriptionSupportNotAvailable; // A helper function to print a message and then exit the sample. void PrintMessageAndExit(String message, int exitCode) @@ -44,15 +40,10 @@ class sample_mqtt5_client String input_key, String input_ca, String input_clientId, - uint64_t input_count, String input_clientName) { std::shared_ptr result = std::make_shared(); - - result->receivedMessages = 0; - result->expectedMessages = input_count; result->name = input_clientName; - result->sharedSubscriptionSupportNotAvailable = false; Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( input_endpoint, input_cert.c_str(), input_key.c_str()); @@ -123,11 +114,6 @@ class sample_mqtt5_client if (eventData.disconnectPacket != nullptr) { fprintf(stdout, "\tReason code: %u\n", (uint32_t)eventData.disconnectPacket->getReasonCode()); - if (eventData.disconnectPacket->getReasonCode() == - Mqtt5::DisconnectReasonCode::AWS_MQTT5_DRC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED) - { - result->sharedSubscriptionSupportNotAvailable = true; - } } }); @@ -147,11 +133,6 @@ class sample_mqtt5_client fprintf(stdout, "\t\twith UserProperty:(%s,%s)\n", prop.getName().c_str(), prop.getValue().c_str()); } } - result->receivedMessages += 1; - if (result->receivedMessages > result->expectedMessages) - { - result->receiveSignal.notify_all(); - } }); result->client = builder->Build(); @@ -183,7 +164,6 @@ int main(int argc, char *argv[]) cmdData.input_key, cmdData.input_ca, cmdData.input_clientId + String("1"), - cmdData.input_count / 2, String("Publisher")); std::shared_ptr subscriberOne = sample_mqtt5_client::create_mqtt5_client( cmdData.input_endpoint, @@ -191,7 +171,6 @@ int main(int argc, char *argv[]) cmdData.input_key, cmdData.input_ca, cmdData.input_clientId + String("2"), - cmdData.input_count / 2, String("Subscriber One")); std::shared_ptr subscriberTwo = sample_mqtt5_client::create_mqtt5_client( cmdData.input_endpoint, @@ -199,7 +178,6 @@ int main(int argc, char *argv[]) cmdData.input_key, cmdData.input_ca, cmdData.input_clientId + String("3"), - cmdData.input_count / 2, String("Subscriber Two")); if (publisher == nullptr || subscriberOne == nullptr || subscriberTwo == nullptr) @@ -216,6 +194,7 @@ int main(int argc, char *argv[]) { publisher->PrintMessageAndExit("Connection was unsuccessful", -1); } + fprintf(stdout, "[%s] Connected.\n", publisher->name.c_str()); } else { @@ -228,6 +207,7 @@ int main(int argc, char *argv[]) { subscriberOne->PrintMessageAndExit("Connection was unsuccessful", -1); } + fprintf(stdout, "[%s] Connected.\n", subscriberOne->name.c_str()); } else { @@ -240,6 +220,7 @@ int main(int argc, char *argv[]) { subscriberTwo->PrintMessageAndExit("Connection was unsuccessful", -1); } + fprintf(stdout, "[%s] Connected.\n", subscriberTwo->name.c_str()); } else { @@ -288,11 +269,6 @@ int main(int argc, char *argv[]) std::this_thread::sleep_for(std::chrono::milliseconds(1000)); if (subscriberOne->sharedSubscriptionSupportNotAvailable == true) { - if (cmdData.input_isCI == true) - { - // TMP: If this fails subscribing in CI, just exit the sample gracefully - subscriberOne->PrintMessageAndExit("Shared Subscriptions not supported", 0); - } subscriberOne->PrintMessageAndExit("Shared Subscriptions not supported", -1); } Mqtt5::SubAckReasonCode result = subscribeSuccess.get_future().get(); @@ -300,6 +276,15 @@ int main(int argc, char *argv[]) { subscriberOne->PrintMessageAndExit("Failed to subscribe", -1); } + + fprintf( + stdout, + "[%s] Subscribed to topic '%s' in shared subscription group '%s'. \n", + subscriberOne->name.c_str(), + cmdData.input_topic.c_str(), + cmdData.input_groupIdentifier.c_str()); + fprintf( + stdout, "[%s] Full subscribed topic is: '%s'.\n", subscriberOne->name.c_str(), input_sharedTopic.c_str()); } else { @@ -313,11 +298,6 @@ int main(int argc, char *argv[]) std::this_thread::sleep_for(std::chrono::milliseconds(1000)); if (subscriberTwo->sharedSubscriptionSupportNotAvailable == true) { - if (cmdData.input_isCI == true) - { - // TMP: If this fails subscribing in CI, just exit the sample gracefully - subscriberTwo->PrintMessageAndExit("Shared Subscriptions not supported", 0); - } subscriberTwo->PrintMessageAndExit("Shared Subscriptions not supported", -1); } Mqtt5::SubAckReasonCode result = subscribeSuccess.get_future().get(); @@ -325,6 +305,15 @@ int main(int argc, char *argv[]) { subscriberTwo->PrintMessageAndExit("Failed to subscribe", -1); } + + fprintf( + stdout, + "[%s] Subscribed to topic '%s' in shared subscription group '%s'.\n", + subscriberTwo->name.c_str(), + cmdData.input_topic.c_str(), + cmdData.input_groupIdentifier.c_str()); + fprintf( + stdout, "[%s] Full subscribed topic is: '%s'.\n", subscriberTwo->name.c_str(), input_sharedTopic.c_str()); } else { @@ -372,18 +361,8 @@ int main(int argc, char *argv[]) } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - - /*********************** Make sure all the messages got to the subscribers ***************************/ - - std::unique_lock receivedLockOne(subscriberOne->receiveMutex); - subscriberOne->receiveSignal.wait(receivedLockOne, [&, subscriberOne] { - return subscriberOne->receivedMessages >= subscriberOne->expectedMessages; - }); - std::unique_lock receivedLockTwo(subscriberTwo->receiveMutex); - subscriberTwo->receiveSignal.wait(receivedLockTwo, [&, subscriberTwo] { - return subscriberTwo->receivedMessages >= subscriberTwo->expectedMessages; - }); + // Wait 5 seconds to let the last publish go out before unsubscribing + std::this_thread::sleep_for(std::chrono::milliseconds(5000)); /*********************** Unsubscribe the subscribers ***************************/ @@ -396,6 +375,13 @@ int main(int argc, char *argv[]) subscriberOne->PrintMessageAndExit("Unsubscribe failed", -1); } unsubscribeFinishedPromise.get_future().wait(); + fprintf( + stdout, + "[%s] Unsubscribed to topic '%s' in shared subscription group '%s'.\n", + subscriberOne->name.c_str(), + cmdData.input_topic.c_str(), + cmdData.input_groupIdentifier.c_str()); + fprintf(stdout, "[%s] Full unsubscribed topic is: '%s'.\n", subscriberOne->name.c_str(), input_sharedTopic.c_str()); unsubscribeFinishedPromise = std::promise(); if (!subscriberTwo->client->Unsubscribe( @@ -404,6 +390,13 @@ int main(int argc, char *argv[]) subscriberTwo->PrintMessageAndExit("Unsubscribe failed", -1); } unsubscribeFinishedPromise.get_future().wait(); + fprintf( + stdout, + "[%s] Unsubscribed to topic '%s' in shared subscription group '%s'.\n", + subscriberTwo->name.c_str(), + cmdData.input_topic.c_str(), + cmdData.input_groupIdentifier.c_str()); + fprintf(stdout, "[%s] Full unsubscribed topic is: '%s'.\n", subscriberTwo->name.c_str(), input_sharedTopic.c_str()); /*********************** Disconnect all the MQTT5 clients ***************************/ @@ -412,18 +405,21 @@ int main(int argc, char *argv[]) publisher->PrintMessageAndExit("Failed to disconnect. Exiting...", -1); } publisher->stoppedPromise.get_future().wait(); + fprintf(stdout, "[%s] Fully stopped.\n", publisher->name.c_str()); if (!subscriberOne->client->Stop()) { subscriberOne->PrintMessageAndExit("Failed to disconnect. Exiting...", -1); } subscriberOne->stoppedPromise.get_future().wait(); + fprintf(stdout, "[%s] Fully stopped.\n", subscriberOne->name.c_str()); if (!subscriberTwo->client->Stop()) { subscriberTwo->PrintMessageAndExit("Failed to disconnect. Exiting...", -1); } subscriberTwo->stoppedPromise.get_future().wait(); + fprintf(stdout, "[%s] Fully stopped.\n", subscriberTwo->name.c_str()); /*********************** Free the shared pointers (MQTT5 clients) ***************************/ publisher->client = nullptr; From 816a21050b8330ce0695832ae0496ef206146ac2 Mon Sep 17 00:00:00 2001 From: Noah Beard Date: Fri, 21 Apr 2023 14:25:12 -0400 Subject: [PATCH 2/3] Missed a spot. Fixed --- samples/mqtt5/mqtt5_shared_subscription/main.cpp | 8 -------- 1 file changed, 8 deletions(-) diff --git a/samples/mqtt5/mqtt5_shared_subscription/main.cpp b/samples/mqtt5/mqtt5_shared_subscription/main.cpp index 96d78c643..998792ae9 100644 --- a/samples/mqtt5/mqtt5_shared_subscription/main.cpp +++ b/samples/mqtt5/mqtt5_shared_subscription/main.cpp @@ -267,10 +267,6 @@ int main(int argc, char *argv[]) { // Wait just a little bit to see if the client was disconnected due to missing Shared Subscription support. std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - if (subscriberOne->sharedSubscriptionSupportNotAvailable == true) - { - subscriberOne->PrintMessageAndExit("Shared Subscriptions not supported", -1); - } Mqtt5::SubAckReasonCode result = subscribeSuccess.get_future().get(); if (result >= Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_UNSPECIFIED_ERROR) { @@ -296,10 +292,6 @@ int main(int argc, char *argv[]) { // Wait just a little bit to see if the client was disconnected due to missing Shared Subscription support. std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - if (subscriberTwo->sharedSubscriptionSupportNotAvailable == true) - { - subscriberTwo->PrintMessageAndExit("Shared Subscriptions not supported", -1); - } Mqtt5::SubAckReasonCode result = subscribeSuccess.get_future().get(); if (result >= Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_UNSPECIFIED_ERROR) { From cb9b0d5d660966bfa979515803072afd44204c2e Mon Sep 17 00:00:00 2001 From: Noah Beard Date: Fri, 21 Apr 2023 14:38:38 -0400 Subject: [PATCH 3/3] Wording adjustment --- samples/mqtt5/mqtt5_shared_subscription/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/mqtt5/mqtt5_shared_subscription/README.md b/samples/mqtt5/mqtt5_shared_subscription/README.md index 65190ddb5..891a0d4ee 100644 --- a/samples/mqtt5/mqtt5_shared_subscription/README.md +++ b/samples/mqtt5/mqtt5_shared_subscription/README.md @@ -71,7 +71,7 @@ Replace with the following with the data from your AWS account: * ``: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`. * ``: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website. -Note that in a real application, you may want to avoid the use of wildcards in your ClientID and shared subscription group names/identifiers. Wildcards should be used only selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports. +Note that in a real application, you may want to avoid the use of wildcards in your ClientID and shared subscription group names/identifiers. Wildcards should only be used selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports.