Skip to content

Update Mqtt5 Library with Latest API changes #568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 19, 2023
48 changes: 34 additions & 14 deletions documents/MQTT5_Userguide.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ SDK MQTT5 support comes from a separate client implementation. In doing so, we

* With the 311 implementation, there were two separate objects, a client and a connection. With MQTT5, there is only the client.

* The user callbacks in the Mqtt5 do not provide a client reference in the way the Mqtt3 API does.
Example:
```
// Client reference
std::shared_ptr<Mqtt5Client> client = nullptr;

// Create Mqtt5Client Builder
Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(...);

// Setup lifecycle callbacks
builder->WithClientConnectionSuccessCallback(
[&client](const Mqtt5::OnConnectionSuccessEventData &eventData) {
// Do mqtt5 client operations
client->Publish(...);
});

// Build Mqtt5Client
client = builder->Build();
```



Expand Down Expand Up @@ -133,33 +152,33 @@ The MQTT5 client emits a set of events related to state and network status chang
std::promise<void> stoppedPromise;

// Setup lifecycle callbacks
builder->withClientConnectionSuccessCallback(
[&connectionPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnConnectionSuccessEventData &eventData) {
builder->WithClientConnectionSuccessCallback(
[&connectionPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) {
fprintf(
stdout,
"Mqtt5 Client connection succeed, clientid: %s.\n",
eventData.negotiatedSettings->getClientId().c_str());
connectionPromise.set_value(true);
});

builder->withClientConnectionFailureCallback(
[&connectionPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnConnectionFailureEventData &eventData) {
builder->WithClientConnectionFailureCallback(
[&connectionPromise](const Mqtt5::OnConnectionFailureEventData &eventData) {
fprintf(
stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode));
connectionPromise.set_value(false);
});

builder->withClientStoppedCallback([&stoppedPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnStoppedEventData &) {
builder->WithClientStoppedCallback([&stoppedPromise](const Mqtt5::OnStoppedEventData &) {
fprintf(stdout, "Mqtt5 Client stopped.\n");
stoppedPromise.set_value();
});

builder->withClientAttemptingConnectCallback([](Mqtt5::Mqtt5Client &, const Mqtt5::OnAttemptingConnectEventData &) {
builder->WithClientAttemptingConnectCallback([](Mqtt5::OnAttemptingConnectEventData &) {
fprintf(stdout, "Mqtt5 Client attempting connection...\n");
});

builder->withClientDisconnectionCallback(
[](Mqtt5::Mqtt5Client &, const Mqtt5::OnDisconnectionEventData &eventData) {
builder->WithClientDisconnectionCallback(
[](const Mqtt5::OnDisconnectionEventData &eventData) {
fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode));
});

Expand Down Expand Up @@ -238,7 +257,7 @@ Emitted once the client has shutdown any associated network connection and enter
// Create Mqtt5Client Builder
Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(...);

builder->withPublishReceivedCallback([](Mqtt5::Mqtt5Client &, const Mqtt5::PublishReceivedEventData &eventData) {
builder->WithPublishReceivedCallback([](const Mqtt5::PublishReceivedEventData &eventData) {
if (eventData.publishPacket == nullptr)
return;
fprintf(stdout, "Publish received on topic %s:", eventData.publishPacket->getTopic().c_str());
Expand Down Expand Up @@ -503,7 +522,7 @@ No matter what your connection transport or authentication method is, you may co
Http::HttpClientConnectionProxyOptions proxyOptions;
proxyOptions.HostName = "<proxyHost>";
proxyOptions.Port = <proxyPort>;
builder->withHttpProxyOptions(proxyOptions);
builder->WithHttpProxyOptions(proxyOptions);

/* You can setup other client options and lifecycle event callbacks before call builder->Build().
** Once the the client get built, you could no longer update the client options or connection options
Expand Down Expand Up @@ -548,9 +567,9 @@ The Subscribe operation takes a description of the SUBSCRIBE packet you wish to
subscriptionList.push_back(data2);
subscriptionList.push_back(data3);

// Creaet a SubscribePacket with the subscription list. You can also use packet->withSubscription(subscription) to push_back a single subscription data.
// Creaet a SubscribePacket with the subscription list. You can also use packet->WithSubscription(subscription) to push_back a single subscription data.
std::shared_ptr<Mqtt5::SubscribePacket> packet = std::make_shared<SubscribePacket>();
packet->withSubscriptions(subscriptionList);
packet->WithSubscriptions(subscriptionList);

bool subSuccess = mqtt5Client->Subscribe(
packet,
Expand Down Expand Up @@ -584,7 +603,7 @@ The Unsubscribe operation takes a description of the UNSUBSCRIBE packet you wish
topics.push_back(topic1);
topics.push_back(topic2);
std::shared_ptr<UnsubscribePacket> unsub = std::make_shared<UnsubscribePacket>();
unsub->withTopicFilters(topics);
unsub->WithTopicFilters(topics);
bool unsubSuccess = mqtt5Client->Unsubscribe(
packet,
[](std::shared_ptr<Mqtt5::Mqtt5Client>, int, std::shared_ptr<Mqtt5::UnSubAckPacket> unsuback){
Expand Down Expand Up @@ -621,7 +640,7 @@ If the PUBLISH was a QoS 1 publish, then the completion callback returns a PubAc
std::shared_ptr<PublishPacket> publish = std::make_shared<PublishPacket>(testTopic, payload, QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE);

// Setup publish completion callback. The callback will get triggered when the pulbish completes and publish result returned from the server
OnPublishCompletionHandler callback = [](std::shared_ptr<Mqtt5Client> client, int, std::shared_ptr<PublishResult> result){
OnPublishCompletionHandler callback = [](int, std::shared_ptr<PublishResult> result){
if(!result->wasSuccessful())
{
fprintf(stdout, "Publish failed with error_code: %d", result->getErrorCode());
Expand Down Expand Up @@ -650,3 +669,4 @@ Below are some best practices for the MQTT5 client that are recommended to follo
* If you are getting unexpected disconnects when trying to connect to AWS IoT Core, make sure to check your IoT Core Thing’s policy and permissions to make sure your device is has the permissions it needs to connect!
* For **Publish**, **Subscribe**, and **Unsubscribe**, you can check the reason codes in the CompletionCallbacks to see if the operation actually succeeded.
* You MUST NOT perform blocking operations on any callback, or you will cause a deadlock. For example: in the on_publish_received callback, do not send a publish, and then wait for the future to complete within the callback. The Client cannot do work until your callback returns, so the thread will be stuck.
* You can use `LastError()` and `ErrorDebugString(error_code)` to get the error code and error message.
113 changes: 54 additions & 59 deletions samples/mqtt5/mqtt5_pubsub/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,21 @@ int main(int argc, char *argv[])
Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str());

// Check if the builder setup correctly.
if (builder == nullptr)
{
printf("Failed to setup mqtt5 client builder.");
printf(
"Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError()));
return -1;
}

// Setup connection options
std::shared_ptr<Mqtt5::ConnectPacket> connectOptions = std::make_shared<Mqtt5::ConnectPacket>();
connectOptions->withClientId(cmdData.input_clientId);
builder->withConnectOptions(connectOptions);
connectOptions->WithClientId(cmdData.input_clientId);
builder->WithConnectOptions(connectOptions);
if (cmdData.input_port != 0)
{
builder->withPort(static_cast<uint16_t>(cmdData.input_port));
builder->WithPort(static_cast<uint16_t>(cmdData.input_port));
}

std::promise<bool> connectionPromise;
Expand All @@ -58,37 +60,34 @@ int main(int argc, char *argv[])
std::promise<bool> subscribeSuccess;

// Setup lifecycle callbacks
builder->withClientConnectionSuccessCallback(
[&connectionPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnConnectionSuccessEventData &eventData) {
builder->WithClientConnectionSuccessCallback(
[&connectionPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) {
fprintf(
stdout,
"Mqtt5 Client connection succeed, clientid: %s.\n",
eventData.negotiatedSettings->getClientId().c_str());
connectionPromise.set_value(true);
});
builder->withClientConnectionFailureCallback(
[&connectionPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnConnectionFailureEventData &eventData) {
fprintf(
stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode));
connectionPromise.set_value(false);
});
builder->withClientStoppedCallback([&stoppedPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnStoppedEventData &) {
builder->WithClientConnectionFailureCallback([&connectionPromise](
const Mqtt5::OnConnectionFailureEventData &eventData) {
fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode));
connectionPromise.set_value(false);
});
builder->WithClientStoppedCallback([&stoppedPromise](const Mqtt5::OnStoppedEventData &) {
fprintf(stdout, "Mqtt5 Client stopped.\n");
stoppedPromise.set_value();
});
builder->withClientAttemptingConnectCallback([](Mqtt5::Mqtt5Client &, const Mqtt5::OnAttemptingConnectEventData &) {
builder->WithClientAttemptingConnectCallback([](const Mqtt5::OnAttemptingConnectEventData &) {
fprintf(stdout, "Mqtt5 Client attempting connection...\n");
});
builder->withClientDisconnectionCallback(
[&disconnectPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnDisconnectionEventData &eventData) {
fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode));
disconnectPromise.set_value();
});
builder->WithClientDisconnectionCallback([&disconnectPromise](const Mqtt5::OnDisconnectionEventData &eventData) {
fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode));
disconnectPromise.set_value();
});

// This is invoked upon the receipt of a Publish on a subscribed topic.
builder->withPublishReceivedCallback(
[&receiveMutex, &receivedCount, &receiveSignal](
Mqtt5::Mqtt5Client & /*client*/, const Mqtt5::PublishReceivedEventData &eventData) {
builder->WithPublishReceivedCallback(
[&receiveMutex, &receivedCount, &receiveSignal](const Mqtt5::PublishReceivedEventData &eventData) {
if (eventData.publishPacket == nullptr)
return;

Expand All @@ -113,7 +112,8 @@ int main(int argc, char *argv[])

if (client == nullptr)
{
fprintf(stdout, "Client creation failed.\n");
fprintf(
stdout, "Failed to Init Mqtt5Client with error code %d: %s", LastError(), ErrorDebugString(LastError()));
return -1;
}

Expand All @@ -127,41 +127,39 @@ int main(int argc, char *argv[])
return -1;
}

auto onSubAck =
[&subscribeSuccess](
std::shared_ptr<Mqtt5::Mqtt5Client>, int error_code, std::shared_ptr<Mqtt5::SubAckPacket> suback) {
if (error_code != 0)
{
fprintf(
stdout,
"MQTT5 Client Subscription failed with error code: (%d)%s\n",
error_code,
aws_error_debug_str(error_code));
subscribeSuccess.set_value(false);
}
if (suback != nullptr)
auto onSubAck = [&subscribeSuccess](int error_code, std::shared_ptr<Mqtt5::SubAckPacket> suback) {
if (error_code != 0)
{
fprintf(
stdout,
"MQTT5 Client Subscription failed with error code: (%d)%s\n",
error_code,
aws_error_debug_str(error_code));
subscribeSuccess.set_value(false);
}
if (suback != nullptr)
{
for (Mqtt5::SubAckReasonCode reasonCode : suback->getReasonCodes())
{
for (Mqtt5::SubAckReasonCode reasonCode : suback->getReasonCodes())
if (reasonCode > Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_UNSPECIFIED_ERROR)
{
if (reasonCode > Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_UNSPECIFIED_ERROR)
{
fprintf(
stdout,
"MQTT5 Client Subscription failed with server error code: (%d)%s\n",
reasonCode,
suback->getReasonString()->c_str());
subscribeSuccess.set_value(false);
return;
}
fprintf(
stdout,
"MQTT5 Client Subscription failed with server error code: (%d)%s\n",
reasonCode,
suback->getReasonString()->c_str());
subscribeSuccess.set_value(false);
return;
}
}
subscribeSuccess.set_value(true);
};
}
subscribeSuccess.set_value(true);
};

Mqtt5::Subscription sub1(cmdData.input_topic, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE);
sub1.withNoLocal(false);
sub1.WithNoLocal(false);
std::shared_ptr<Mqtt5::SubscribePacket> subPacket = std::make_shared<Mqtt5::SubscribePacket>();
subPacket->withSubscription(std::move(sub1));
subPacket->WithSubscription(std::move(sub1));

if (client->Subscribe(subPacket, onSubAck))
{
Expand All @@ -174,9 +172,7 @@ int main(int argc, char *argv[])
* Setup publish completion callback. The callback will get triggered when the publish completes (when
* the client received the PubAck from the server).
*/
auto onPublishComplete = [](std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client,
int,
std::shared_ptr<Aws::Crt::Mqtt5::PublishResult> result) {
auto onPublishComplete = [](int, std::shared_ptr<Aws::Crt::Mqtt5::PublishResult> result) {
if (!result->wasSuccessful())
{
fprintf(stdout, "Publish failed with error_code: %d", result->getErrorCode());
Expand Down Expand Up @@ -225,11 +221,10 @@ int main(int argc, char *argv[])
// Unsubscribe from the topic.
std::promise<void> unsubscribeFinishedPromise;
std::shared_ptr<Mqtt5::UnsubscribePacket> unsub = std::make_shared<Mqtt5::UnsubscribePacket>();
unsub->withTopicFilter(cmdData.input_topic);
if (!client->Unsubscribe(
unsub, [&](std::shared_ptr<Mqtt5::Mqtt5Client>, int, std::shared_ptr<Mqtt5::UnSubAckPacket>) {
unsubscribeFinishedPromise.set_value();
}))
unsub->WithTopicFilter(cmdData.input_topic);
if (!client->Unsubscribe(unsub, [&](int, std::shared_ptr<Mqtt5::UnSubAckPacket>) {
unsubscribeFinishedPromise.set_value();
}))
{
fprintf(stdout, "Unsubscription failed.\n");
exit(-1);
Expand Down
Loading