diff --git a/documents/MQTT5_Userguide.md b/documents/MQTT5_Userguide.md index b2102f568..9fd4cef8e 100644 --- a/documents/MQTT5_Userguide.md +++ b/documents/MQTT5_Userguide.md @@ -233,7 +233,8 @@ Emitted once the client has shutdown any associated network connection and enter ## How to Process Message -[`withPublishReceivedCallback`](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_iot_1_1_mqtt5_client_builder.html#a178bd62d671ea2f273841e2e097744e8) will get involved when a publish is received. The callback should be set before client get build. Please note, once +[`withPublishReceivedCallback`](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_iot_1_1_mqtt5_client_builder.html#a178bd62d671ea2f273841e2e097744e8) will get involved when a publish is received. The callback should be set before building the client. Please note, once a MQTT5 client is built and finalized, the client configuration is immutable. + ``` // Create Mqtt5Client Builder Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(...); @@ -601,3 +602,4 @@ Below are some best practices for the MQTT5 client that are recommended to follo * Use the minimum QoS you can get away with for the lowest latency and bandwidth costs. For example, if you are sending data consistently multiple times per second and do not have to have a guarantee the server got each and every publish, using QoS 0 may be ideal compared to QoS 1. Of course, this heavily depends on your use case but generally it is recommended to use the lowest QoS possible. * If you are getting unexpected disconnects when trying to connect to AWS IoT Core, make sure to check your IoT Core Thing’s policy and permissions to make sure your device is has the permissions it needs to connect! * For **Publish**, **Subscribe**, and **Unsubscribe**, you can check the reason codes in the CompletionCallbacks to see if the operation actually succeeded. +* You MUST NOT perform blocking operations on any callback, or you will cause a deadlock. For example: in the on_publish_received callback, do not send a publish, and then wait for the future to complete within the callback. The Client cannot do work until your callback returs, so the thread will be stuck. diff --git a/samples/README.md b/samples/README.md index 497988ec6..9cc92efe2 100644 --- a/samples/README.md +++ b/samples/README.md @@ -161,7 +161,7 @@ Your Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot- "iot:Receive" ], "Resource": [ - "arn:aws:iot:region:account:topic/test/topic1" + "arn:aws:iot:region:account:topic/test/topic" ] }, { @@ -190,8 +190,9 @@ Your Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot- To run the basic MQTT5 PubSub use the following command: ``` sh -./mqtt5_pubsub --endpoint --ca_file +./mqtt5_pubsub --endpoint --ca_file --cert --key +--topic ``` ## Websocket Connect diff --git a/samples/mqtt5/mqtt5_pubsub/main.cpp b/samples/mqtt5/mqtt5_pubsub/main.cpp index beceacd2b..8451e9081 100644 --- a/samples/mqtt5/mqtt5_pubsub/main.cpp +++ b/samples/mqtt5/mqtt5_pubsub/main.cpp @@ -22,7 +22,6 @@ int main(int argc, char *argv[]) */ ApiHandle apiHandle; uint32_t messageCount = 10; - String testTopic = "test/topic1"; std::mutex receiveMutex; std::condition_variable receiveSignal; @@ -35,20 +34,40 @@ int main(int argc, char *argv[]) cmdUtils.RegisterCommand("key", "", "Path to your key in PEM format."); cmdUtils.RegisterCommand("cert", "", "Path to your client certificate in PEM format."); cmdUtils.AddCommonProxyCommands(); + cmdUtils.AddCommonTopicMessageCommands(); cmdUtils.RegisterCommand("client_id", "", "Client id to use (optional, default='test-*')"); + cmdUtils.RegisterCommand("port_override", "", "The port override to use when connecting (optional)"); + cmdUtils.RegisterCommand("count", "", "The number of messages to send (optional, default='10')"); cmdUtils.AddLoggingCommands(); const char **const_argv = (const char **)argv; cmdUtils.SendArguments(const_argv, const_argv + argc); cmdUtils.StartLoggingBasedOnCommand(&apiHandle); + String topic = cmdUtils.GetCommandOrDefault("topic", "test/topic"); + String clientId = cmdUtils.GetCommandOrDefault("client_id", String("test-") + Aws::Crt::UUID().ToString()); + + String messagePayload = cmdUtils.GetCommandOrDefault("message", "Hello world!"); + if (cmdUtils.HasCommand("count")) + { + int count = atoi(cmdUtils.GetCommand("count").c_str()); + if (count > 0) + { + messageCount = count; + } + } + // Create a Client using Mqtt5ClientBuilder Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( cmdUtils.GetCommand("endpoint"), cmdUtils.GetCommand("cert").c_str(), cmdUtils.GetCommand("key").c_str()); + if (builder == nullptr) + { + printf("Failed to setup mqtt5 client builder."); + return -1; + } + // Setup connection options std::shared_ptr connectOptions = std::make_shared(); - // Get the client ID to send with the connection - String clientId = cmdUtils.GetCommandOrDefault("client_id", String("test-") + UUID().ToString()); connectOptions->withClientId(clientId); builder->withConnectOptions(connectOptions); @@ -89,27 +108,32 @@ int main(int argc, char *argv[]) disconnectPromise.set_value(); }); - builder->withPublishReceivedCallback([&receiveMutex, &receivedCount, &receiveSignal]( - Mqtt5::Mqtt5Client &, const Mqtt5::PublishReceivedEventData &eventData) { - if (eventData.publishPacket == nullptr) - return; + /* + * This is invoked upon the receipt of a Publish on a subscribed topic. + */ + builder->withPublishReceivedCallback( + [&receiveMutex, &receivedCount, &receiveSignal]( + Mqtt5::Mqtt5Client & /*client*/, const Mqtt5::PublishReceivedEventData &eventData) { + if (eventData.publishPacket == nullptr) + return; - std::lock_guard lock(receiveMutex); - ++receivedCount; - fprintf(stdout, "Publish received on topic %s:", eventData.publishPacket->getTopic().c_str()); - fwrite(eventData.publishPacket->getPayload().ptr, 1, eventData.publishPacket->getPayload().len, stdout); - fprintf(stdout, "\n"); + std::lock_guard lock(receiveMutex); + ++receivedCount; + fprintf(stdout, "Publish received on topic %s:", eventData.publishPacket->getTopic().c_str()); + fwrite(eventData.publishPacket->getPayload().ptr, 1, eventData.publishPacket->getPayload().len, stdout); + fprintf(stdout, "\n"); - for (Mqtt5::UserProperty prop : eventData.publishPacket->getUserProperties()) - { - fprintf(stdout, "\twith UserProperty:(%s,%s)\n", prop.getName().c_str(), prop.getValue().c_str()); - } - receiveSignal.notify_all(); - }); + for (Mqtt5::UserProperty prop : eventData.publishPacket->getUserProperties()) + { + fprintf(stdout, "\twith UserProperty:(%s,%s)\n", prop.getName().c_str(), prop.getValue().c_str()); + } + receiveSignal.notify_all(); + }); // Create Mqtt5Client std::shared_ptr client = builder->Build(); + // Clean up the builder delete builder; if (client == nullptr) @@ -118,6 +142,7 @@ int main(int argc, char *argv[]) return -1; } + // Start mqtt5 connection session if (client->Start()) { if (connectionPromise.get_future().get() == false) @@ -125,59 +150,117 @@ int main(int argc, char *argv[]) return -1; } - Mqtt5::Subscription sub1(testTopic, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE); + auto onSubAck = + [&subscribeSuccess]( + std::shared_ptr, int error_code, std::shared_ptr suback) { + if (error_code != 0) + { + fprintf( + stdout, + "MQTT5 Client Subscription failed with error code: (%d)%s\n", + error_code, + aws_error_debug_str(error_code)); + subscribeSuccess.set_value(false); + } + if (suback != nullptr) + { + for (Mqtt5::SubAckReasonCode reasonCode : suback->getReasonCodes()) + { + if (reasonCode > Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_UNSPECIFIED_ERROR) + { + fprintf( + stdout, + "MQTT5 Client Subscription failed with server error code: (%d)%s\n", + reasonCode, + suback->getReasonString()->c_str()); + subscribeSuccess.set_value(false); + return; + } + } + } + subscribeSuccess.set_value(true); + }; + + Mqtt5::Subscription sub1(topic, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE); sub1.withNoLocal(false); std::shared_ptr subPacket = std::make_shared(); subPacket->withSubscription(std::move(sub1)); - if (client->Subscribe( - subPacket, - [&subscribeSuccess]( - std::shared_ptr, int error_code, std::shared_ptr) { - subscribeSuccess.set_value(error_code == 0); - })) + + if (client->Subscribe(subPacket, onSubAck)) { // Waiting for subscription completed. if (subscribeSuccess.get_future().get() == true) { - fprintf(stdout, "Subscription Success.\n"); // Setup publish completion callback. The callback will get triggered when the pulbish completes (when // the client received the PubAck from the server). - Aws::Crt::Mqtt5::OnPublishCompletionHandler callback = - [](std::shared_ptr client, - int, - std::shared_ptr result) { - if (!result->wasSuccessful()) + auto onPublishComplete = [](std::shared_ptr client, + int, + std::shared_ptr result) { + if (!result->wasSuccessful()) + { + fprintf(stdout, "Publish failed with error_code: %d", result->getErrorCode()); + } + else if (result != nullptr) + { + std::shared_ptr puback = + std::dynamic_pointer_cast(result->getAck()); + if (puback->getReasonCode() == 0) { - fprintf(stdout, "Publish failed with error_code: %d", result->getErrorCode()); + fprintf(stdout, "Publish Succeed.\n"); } else { - fprintf(stdout, "Publish Succeed.\n"); - }; + fprintf( + stdout, + "PubACK reason code: %d : %s\n", + puback->getReasonCode(), + puback->getReasonString()->c_str()); + } }; + }; uint32_t publishedCount = 0; while (publishedCount < messageCount) { - String message = String("Mqtt5 Publish Message: ") + std::to_string(publishedCount).c_str(); + String message = messagePayload + std::to_string(publishedCount).c_str(); ByteCursor payload = ByteCursorFromString(message); - std::shared_ptr publish = std::make_shared( - testTopic, payload, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE); - if (client->Publish(publish, std::move(callback))) + std::shared_ptr publish = + std::make_shared(topic, payload, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE); + if (client->Publish(publish, onPublishComplete)) + { ++publishedCount; + } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } - std::unique_lock receivedLock(receiveMutex); - receiveSignal.wait(receivedLock, [&] { return receivedCount >= messageCount; }); + { + std::unique_lock receivedLock(receiveMutex); + receiveSignal.wait(receivedLock, [&] { return receivedCount >= messageCount; }); + } + + /* + * Unsubscribe from the topic. + */ + std::promise unsubscribeFinishedPromise; + std::shared_ptr unsub = std::make_shared(); + unsub->withTopicFilter(topic); + if (!client->Unsubscribe( + unsub, [&](std::shared_ptr, int, std::shared_ptr) { + unsubscribeFinishedPromise.set_value(); + })) + { + fprintf(stdout, "Unsubscription failed.\n"); + exit(-1); + } + unsubscribeFinishedPromise.get_future().wait(); } else { - fprintf(stdout, "Subscription failed. Please check the SubAck packet for details.\n"); + fprintf(stdout, "Subscription failed.\n"); } } else @@ -185,6 +268,7 @@ int main(int argc, char *argv[]) fprintf(stdout, "Subscribe operation failed on client.\n"); } + /* Disconnect */ if (!client->Stop()) { fprintf(stdout, "Failed to disconnect from the mqtt connection. Exiting..\n");