Skip to content

ReadMe and Sample update #504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion documents/MQTT5_Userguide.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ Emitted once the client has shutdown any associated network connection and enter


## How to Process Message
[`withPublishReceivedCallback`](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_iot_1_1_mqtt5_client_builder.html#a178bd62d671ea2f273841e2e097744e8) will get involved when a publish is received. The callback should be set before client get build. Please note, once
[`withPublishReceivedCallback`](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_iot_1_1_mqtt5_client_builder.html#a178bd62d671ea2f273841e2e097744e8) will get involved when a publish is received. The callback should be set before building the client. Please note, once a MQTT5 client is built and finalized, the client configuration is immutable.

```
// Create Mqtt5Client Builder
Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(...);
Expand Down Expand Up @@ -601,3 +602,4 @@ Below are some best practices for the MQTT5 client that are recommended to follo
* Use the minimum QoS you can get away with for the lowest latency and bandwidth costs. For example, if you are sending data consistently multiple times per second and do not have to have a guarantee the server got each and every publish, using QoS 0 may be ideal compared to QoS 1. Of course, this heavily depends on your use case but generally it is recommended to use the lowest QoS possible.
* If you are getting unexpected disconnects when trying to connect to AWS IoT Core, make sure to check your IoT Core Thing’s policy and permissions to make sure your device is has the permissions it needs to connect!
* For **Publish**, **Subscribe**, and **Unsubscribe**, you can check the reason codes in the CompletionCallbacks to see if the operation actually succeeded.
* You MUST NOT perform blocking operations on any callback, or you will cause a deadlock. For example: in the on_publish_received callback, do not send a publish, and then wait for the future to complete within the callback. The Client cannot do work until your callback returs, so the thread will be stuck.
5 changes: 3 additions & 2 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ Your Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-
"iot:Receive"
],
"Resource": [
"arn:aws:iot:<b>region</b>:<b>account</b>:topic/test/topic1"
"arn:aws:iot:<b>region</b>:<b>account</b>:topic/test/topic"
]
},
{
Expand Down Expand Up @@ -190,8 +190,9 @@ Your Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-
To run the basic MQTT5 PubSub use the following command:

``` sh
./mqtt5_pubsub --endpoint <endpoint> --ca_file <path to root CA>
./mqtt5_pubsub --endpoint <endpoint> --ca_file <path to root CA>
--cert <path to the certificate> --key <path to the private key>
--topic <topic name>
```

## Websocket Connect
Expand Down
166 changes: 125 additions & 41 deletions samples/mqtt5/mqtt5_pubsub/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ int main(int argc, char *argv[])
*/
ApiHandle apiHandle;
uint32_t messageCount = 10;
String testTopic = "test/topic1";

std::mutex receiveMutex;
std::condition_variable receiveSignal;
Expand All @@ -35,20 +34,40 @@ int main(int argc, char *argv[])
cmdUtils.RegisterCommand("key", "<path>", "Path to your key in PEM format.");
cmdUtils.RegisterCommand("cert", "<path>", "Path to your client certificate in PEM format.");
cmdUtils.AddCommonProxyCommands();
cmdUtils.AddCommonTopicMessageCommands();
cmdUtils.RegisterCommand("client_id", "<str>", "Client id to use (optional, default='test-*')");
cmdUtils.RegisterCommand("port_override", "<int>", "The port override to use when connecting (optional)");
cmdUtils.RegisterCommand("count", "<int>", "The number of messages to send (optional, default='10')");
cmdUtils.AddLoggingCommands();
const char **const_argv = (const char **)argv;
cmdUtils.SendArguments(const_argv, const_argv + argc);
cmdUtils.StartLoggingBasedOnCommand(&apiHandle);

String topic = cmdUtils.GetCommandOrDefault("topic", "test/topic");
String clientId = cmdUtils.GetCommandOrDefault("client_id", String("test-") + Aws::Crt::UUID().ToString());

String messagePayload = cmdUtils.GetCommandOrDefault("message", "Hello world!");
if (cmdUtils.HasCommand("count"))
{
int count = atoi(cmdUtils.GetCommand("count").c_str());
if (count > 0)
{
messageCount = count;
}
}

// Create a Client using Mqtt5ClientBuilder
Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
cmdUtils.GetCommand("endpoint"), cmdUtils.GetCommand("cert").c_str(), cmdUtils.GetCommand("key").c_str());

if (builder == nullptr)
{
printf("Failed to setup mqtt5 client builder.");
return -1;
}

// Setup connection options
std::shared_ptr<Mqtt5::ConnectPacket> connectOptions = std::make_shared<Mqtt5::ConnectPacket>();
// Get the client ID to send with the connection
String clientId = cmdUtils.GetCommandOrDefault("client_id", String("test-") + UUID().ToString());
connectOptions->withClientId(clientId);
builder->withConnectOptions(connectOptions);

Expand Down Expand Up @@ -89,27 +108,32 @@ int main(int argc, char *argv[])
disconnectPromise.set_value();
});

builder->withPublishReceivedCallback([&receiveMutex, &receivedCount, &receiveSignal](
Mqtt5::Mqtt5Client &, const Mqtt5::PublishReceivedEventData &eventData) {
if (eventData.publishPacket == nullptr)
return;
/*
* This is invoked upon the receipt of a Publish on a subscribed topic.
*/
builder->withPublishReceivedCallback(
[&receiveMutex, &receivedCount, &receiveSignal](
Mqtt5::Mqtt5Client & /*client*/, const Mqtt5::PublishReceivedEventData &eventData) {
if (eventData.publishPacket == nullptr)
return;

std::lock_guard<std::mutex> lock(receiveMutex);
++receivedCount;
fprintf(stdout, "Publish received on topic %s:", eventData.publishPacket->getTopic().c_str());
fwrite(eventData.publishPacket->getPayload().ptr, 1, eventData.publishPacket->getPayload().len, stdout);
fprintf(stdout, "\n");
std::lock_guard<std::mutex> lock(receiveMutex);
++receivedCount;
fprintf(stdout, "Publish received on topic %s:", eventData.publishPacket->getTopic().c_str());
fwrite(eventData.publishPacket->getPayload().ptr, 1, eventData.publishPacket->getPayload().len, stdout);
fprintf(stdout, "\n");

for (Mqtt5::UserProperty prop : eventData.publishPacket->getUserProperties())
{
fprintf(stdout, "\twith UserProperty:(%s,%s)\n", prop.getName().c_str(), prop.getValue().c_str());
}
receiveSignal.notify_all();
});
for (Mqtt5::UserProperty prop : eventData.publishPacket->getUserProperties())
{
fprintf(stdout, "\twith UserProperty:(%s,%s)\n", prop.getName().c_str(), prop.getValue().c_str());
}
receiveSignal.notify_all();
});

// Create Mqtt5Client
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client = builder->Build();

// Clean up the builder
delete builder;

if (client == nullptr)
Expand All @@ -118,73 +142,133 @@ int main(int argc, char *argv[])
return -1;
}

// Start mqtt5 connection session
if (client->Start())
{
if (connectionPromise.get_future().get() == false)
{
return -1;
}

Mqtt5::Subscription sub1(testTopic, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE);
auto onSubAck =
[&subscribeSuccess](
std::shared_ptr<Mqtt5::Mqtt5Client>, int error_code, std::shared_ptr<Mqtt5::SubAckPacket> suback) {
if (error_code != 0)
{
fprintf(
stdout,
"MQTT5 Client Subscription failed with error code: (%d)%s\n",
error_code,
aws_error_debug_str(error_code));
subscribeSuccess.set_value(false);
}
if (suback != nullptr)
{
for (Mqtt5::SubAckReasonCode reasonCode : suback->getReasonCodes())
{
if (reasonCode > Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_UNSPECIFIED_ERROR)
{
fprintf(
stdout,
"MQTT5 Client Subscription failed with server error code: (%d)%s\n",
reasonCode,
suback->getReasonString()->c_str());
subscribeSuccess.set_value(false);
return;
}
}
}
subscribeSuccess.set_value(true);
};

Mqtt5::Subscription sub1(topic, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE);
sub1.withNoLocal(false);
std::shared_ptr<Mqtt5::SubscribePacket> subPacket = std::make_shared<Mqtt5::SubscribePacket>();
subPacket->withSubscription(std::move(sub1));
if (client->Subscribe(
subPacket,
[&subscribeSuccess](
std::shared_ptr<Mqtt5::Mqtt5Client>, int error_code, std::shared_ptr<Mqtt5::SubAckPacket>) {
subscribeSuccess.set_value(error_code == 0);
}))

if (client->Subscribe(subPacket, onSubAck))
{
// Waiting for subscription completed.
if (subscribeSuccess.get_future().get() == true)
{

fprintf(stdout, "Subscription Success.\n");

// Setup publish completion callback. The callback will get triggered when the pulbish completes (when
// the client received the PubAck from the server).
Aws::Crt::Mqtt5::OnPublishCompletionHandler callback =
[](std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client,
int,
std::shared_ptr<Aws::Crt::Mqtt5::PublishResult> result) {
if (!result->wasSuccessful())
auto onPublishComplete = [](std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client,
int,
std::shared_ptr<Aws::Crt::Mqtt5::PublishResult> result) {
if (!result->wasSuccessful())
{
fprintf(stdout, "Publish failed with error_code: %d", result->getErrorCode());
}
else if (result != nullptr)
{
std::shared_ptr<Mqtt5::PubAckPacket> puback =
std::dynamic_pointer_cast<Mqtt5::PubAckPacket>(result->getAck());
if (puback->getReasonCode() == 0)
{
fprintf(stdout, "Publish failed with error_code: %d", result->getErrorCode());
fprintf(stdout, "Publish Succeed.\n");
}
else
{
fprintf(stdout, "Publish Succeed.\n");
};
fprintf(
stdout,
"PubACK reason code: %d : %s\n",
puback->getReasonCode(),
puback->getReasonString()->c_str());
}
};
};

uint32_t publishedCount = 0;
while (publishedCount < messageCount)
{
String message = String("Mqtt5 Publish Message: ") + std::to_string(publishedCount).c_str();
String message = messagePayload + std::to_string(publishedCount).c_str();
ByteCursor payload = ByteCursorFromString(message);

std::shared_ptr<Mqtt5::PublishPacket> publish = std::make_shared<Mqtt5::PublishPacket>(
testTopic, payload, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE);
if (client->Publish(publish, std::move(callback)))
std::shared_ptr<Mqtt5::PublishPacket> publish =
std::make_shared<Mqtt5::PublishPacket>(topic, payload, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE);
if (client->Publish(publish, onPublishComplete))
{
++publishedCount;
}

std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

std::unique_lock<std::mutex> receivedLock(receiveMutex);
receiveSignal.wait(receivedLock, [&] { return receivedCount >= messageCount; });
{
std::unique_lock<std::mutex> receivedLock(receiveMutex);
receiveSignal.wait(receivedLock, [&] { return receivedCount >= messageCount; });
}

/*
* Unsubscribe from the topic.
*/
std::promise<void> unsubscribeFinishedPromise;
std::shared_ptr<Mqtt5::UnsubscribePacket> unsub = std::make_shared<Mqtt5::UnsubscribePacket>();
unsub->withTopicFilter(topic);
if (!client->Unsubscribe(
unsub, [&](std::shared_ptr<Mqtt5::Mqtt5Client>, int, std::shared_ptr<Mqtt5::UnSubAckPacket>) {
unsubscribeFinishedPromise.set_value();
}))
{
fprintf(stdout, "Unsubscription failed.\n");
exit(-1);
}
unsubscribeFinishedPromise.get_future().wait();
}
else
{
fprintf(stdout, "Subscription failed. Please check the SubAck packet for details.\n");
fprintf(stdout, "Subscription failed.\n");
}
}
else
{
fprintf(stdout, "Subscribe operation failed on client.\n");
}

/* Disconnect */
if (!client->Stop())
{
fprintf(stdout, "Failed to disconnect from the mqtt connection. Exiting..\n");
Expand Down