-
Notifications
You must be signed in to change notification settings - Fork 639
Add subscription manager for MQTT #1094
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## development #1094 +/- ##
===============================================
+ Coverage 96.54% 98.46% +1.91%
===============================================
Files 9 4 -5
Lines 5643 1301 -4342
Branches 641 383 -258
===============================================
- Hits 5448 1281 -4167
+ Misses 9 0 -9
+ Partials 186 20 -166
Continue to review full report at Codecov.
|
|
||
/* Check for an exact match if the incoming topic name and the registered | ||
* topic filter length match. */ | ||
if( topicNameLength == topicFilterLength ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might need to adjust the logic on this, since the topic filter/a
should match the filter filter/#
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have updated the logic to accommodate the case
/** | ||
* @brief The registry to store records of topic filters and their subscription callbacks. | ||
*/ | ||
static SubscriptionManager_Record_t callbackRecordList[ MAX_SUBSCRIPTION_CALLBACK_RECORDS ] = { 0 }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So applications won't have access to the subscription list, they can only interface with it through the subscription manager. Is this intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is the intended use of the manager here as a singleton. The alternative would be to use a context for the subscription manager that the application would pass for multiple instances of subscription manager.
I think for the purposes of both a single and multi-threaded demo, a singleton subscription manager is suffice for now. If we have a need for multiple subscription managers in the future, we can expose the context to be application provided
topicNameLength, | ||
nameIndex, | ||
filterIndex, | ||
&status ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So &status
is passed as the pMatch
parameter of this function, and this function's return value (called status
in its implementation) is assigned to matchFound
. I think these names are a little confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's a good point. I had most of the code as-is from the v4_beta branch here.
Have updated the names to be related to their meaning.
else | ||
{ | ||
/* Should the topic string be copied? */ | ||
callbackRecordList[ availableIndex ].pTopicFilter = pTopicFilter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the subscription manager supposed to be specific to one demo? If so, I'm wondering why it is in its own directory. If not, I think this should be changed to a static buffer and memcpy, since if anyone tries to tweak the demo then they'll see issues due to this code that's in a completely different directory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The subscription manager is meant to be used by other service client libraries (like Shadow), and is thus, placed in its own folder along with creation of its library target so that demo targets can link against it.
Is your concern that the customer may tweak the demo to use a stack based string pointer instead of a global, and thus, run into memory-out-of scope access problems?
Would it be fine to mitigate the concern by adding comments in the demo code about keeping the topic filters as global string literals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should at least document this somewhere, such as where SubscriptionManager_Record_t is typedef'ed. We should mention the limitation of using a const char * instead of a static buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a @note
in the documentation of the RegisterCallback
function
else | ||
{ | ||
/* Should the topic string be copied? */ | ||
callbackRecordList[ availableIndex ].pTopicFilter = pTopicFilter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should at least document this somewhere, such as where SubscriptionManager_Record_t is typedef'ed. We should mention the limitation of using a const char * instead of a static buffer
* topic filter length match. */ | ||
if( topicNameLength == topicFilterLength ) | ||
{ | ||
status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 ) ? true : false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're not subjecting this to MISRA, then I think we should leave out the extraneous ternary
status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 ) ? true : false; | |
status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer that we keep it to be MISRA compliant, as we could consider making this a library (as subscription manager is needed by all MQTT based service client library demos)
|
||
/* Match against wildcard characters in topic filter only if the incoming | ||
* PUBLISH topic name does not start with a "$" character. */ | ||
if( ( status == false ) && ( pTopicName[ 0 ] != '$' ) ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See Topics beginning with $. What if the topic name is $SYS/monitor
and the topic filter is $SYS/#
? Do we not want to match this as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Have updated the logic to allow wildcard matching for reserved topics when wildcards are at a non-beginning position
( strncmp( pTopicFilter, callbackRecordList[ availableIndex ].pTopicFilter, topicFilterLength ) | ||
== 0 ) ) | ||
{ | ||
recordExists = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we mentioned this in the other PR, but we ought to document that this doesn't allow multiple callbacks for the same topic filter. I think it's plausible that more than one application wants to receive messages from the same topic filter, in which case we should point to the code that needs to be changed in order to allow it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a @note
in the API documentation of the function in the header file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that adding documentation for supporting multiple callback registration per topic filter is necessarily required. (The source code is available for viewing and tweaking).
If we really think that there can be a customer need for multiple callbacks per topic filter, then we should just support that in the subscription manager. From the perspective of a single threaded application, I don;t see a reason why multiple callbacks per topic filter would be needed.
if( ( availableIndex == MAX_SUBSCRIPTION_CALLBACK_RECORDS ) ) | ||
{ | ||
/* The record list is full. */ | ||
LogError( ( "Unable to register callback: Registry list is full: TopicFilter=%.*s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case might be helpful to log what the max capacity actually is
Co-authored-by: Muneeb Ahmed <[email protected]>
…t-device-sdk-embedded-C into add-subscription-manager
@@ -0,0 +1,17 @@ | |||
set( LIBRARY_NAME "mqtt_subscription_manager" ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be called DEMO_NAME.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is being added as a library instead of a demo binary so that multiple demos can call it (and thereby, link against it)
target_link_libraries( | ||
${LIBRARY_NAME} | ||
PRIVATE | ||
mqtt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that Subscription manager links to MQTT? Should be other way round?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mqtt
library does not have any dependency on the subscription manager, but subscription manager depends on the mqtt for its types like MQTTPublishInfo_t
.
Practically, only a header dependency on mqtt exists for the subscription manager; so a linking dependency is not strictly required. (I can remove this linking dependency if you think that will be better, but the header include path dependency would definitely remain)
static SubscriptionManager_Record_t callbackRecordList[ MAX_SUBSCRIPTION_CALLBACK_RECORDS ] = { 0 }; | ||
|
||
/** | ||
* @brief Handle special corner cases regarding wildcards at the end of topic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description is vague. Please elaborate the description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated description to mention the special cases handled in the utility
uint16_t filterIndex ); | ||
|
||
/** | ||
* @brief Attempt to match characters in a topic filter by wildcards. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please elaborate the description. If the current character in the topic filter is '+', it consumes one level form the topic name. If the If the current character in the topic filter is '#', it assumes that the topic name matches and stops matching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added description for the matching logic performed in the function
Co-authored-by: Gaurav-Aggarwal-AWS <[email protected]>
… functions based on review feedback
Co-authored-by: Gaurav-Aggarwal-AWS <[email protected]>
…ded-C into add-subscription-manager
/** | ||
* @brief Success return value from Subscription Manager API. | ||
*/ | ||
SUBSCRIPTION_MANAGER_SUCCESS = 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Usually 0 means success, is there any special reason for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dan4thewin has generally suggested to avoid using 0 as a success value as it could be a statically allocated default value assigned for variables and result in false positive validation in tests. We had followed the same philosophy for the simplied taskpool's enumeration values: https://github.com/FreeRTOS/FreeRTOS/blob/lts-development/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta1/abstractions/platform/freertos/private/taskpool/taskpool_freertos.h#L43-L64
Co-authored-by: Muneeb Ahmed <[email protected]>
…t-device-sdk-embedded-C into add-subscription-manager
Co-authored-by: Muneeb Ahmed <[email protected]>
…t-device-sdk-embedded-C into add-subscription-manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some rewording
Co-authored-by: Muneeb Ahmed <[email protected]>
Description of changes:
Add a subscription manager library (under
/demos
folder) for handling multiple subscriptions to using the same MQTT connection context.Purpose
The subscription manager allows application to register different callbacks for the subscribed topic filters so that incoming PUBLISH messages on the MQTT connection can be dispatched to the specific callback of the matching topic filter.
Note: This PR sanitizes the draft implementation of the library shown in the draft PR #1091
Testing: A follow-up PR #1098 exhibits the use of the subscription manager in an MQTT demo (and thus, the subscription manager logic is tested through the demo)
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.