Skip to content

Conversation

aggarw13
Copy link
Contributor

@aggarw13 aggarw13 commented Jul 31, 2020

Subscription Manager For MQTT

Subscription Manager will be utilized in demo that exhibits connection sharing in a single-threaded demo

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

}
else
{
/* Should the topic string be copied? */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To accommodate varying use cases yes, so the pTopicFilter member of the SubscriptionManager_Record_t would need to be an array. However, since it's just a demo and you control everything, it's up to you; if you only use constant strings then I don't think it's a major issue as long as there's a comment calling that out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think for the single-threaded demo purpose, I will keep it simple and avoid the memory allocation problem (currently, the record list is statically allocated for a fixed size, but with local copy of the topic filter, it would require a max limit on the topic filter size, and thereby, increase the record list structure size)

{
recordFound = true;
}
} while( ( matchingRecordIndex < recordListSize ) && ( recordFound == false ) );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if more than one application registers a different callback to the same topic filter? Then this would only remove the first one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the subscription manager is geared toward(s) a single-threaded demo that will showcase connection sharing for multiple libraries (like Jobs, Shadow) that need different callbacks for processing their topic-specific PUBLISH messages.

However, even in the case of a multi-threaded use-case, a single application's call to remove its callback shouldn't remove another application's callback for the same topic filter. In that case, the subscription manager would need to store some thread/application-specific information in the records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a good point that Muneeb mentioned. What is the purpose of this Subscription manager? Is it only for a single task use case? Is there a plan to improve it to support it for connection sharing between multiple tasks?
If we are extending it to multiple tasks, yea we would need some way to identify the task from which it is called.


size_t recordListCursor = 0u;

while( findRecordForTopic( pPublishInfo->pTopicName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why findRecordForTopic is necessary. Since you're iterating over every entry in the list, why not just use a for loop in this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, removed


/* If an exact match is required, return the result of the comparison above.
* Otherwise, attempt to match with MQTT wildcards in topic filters. */
if( requiresExactMatch == false )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we ever want to set this to true? I see you don't and that makes sense to me, so I think this function could just be removed and topicFilterMatch used in its place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the topic matching logic from the v4_beta branch here.
Looking at the call-site of this function in the v4_beta implementation, it seems like this parameter was added for a different use-case (of updating registered callback against the same topic for which subscription already exists).

I will remove the requiresExactMatch logic

typedef struct SubscriptionManager_Record
{
const char * pTopicFilter;
size_t topicFilterLength;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a uint16_t, since filter lengths must fit in 2 bytes


typedef struct SubscriptionManager_Record
{
const char * pTopicFilter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which module is responsible for topic filter memory? Will application be responsible for all topic filter buffers if it subscribes to different topics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the single-threaded application will be responsible to maintain the lifetime of the topic string memory that is provides to the subscription manager. (Ideally, in a multi-threaded use-case, the data should be copied into the record, but I am thinking of keeping it simple for the single-threaded demo)

#include <string.h>
#include "mqtt_subscription_manager.h"

#define MAX_SUBSCRIPTION_CALLBACK_RECORDS 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would need to be the default value. we would need to wrap with #ifndef to this define

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will probably look to move it to the demo_config.h

filterIndex,
&nameIndex,
&status );
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have to check for the topics beginning with $ as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the '$' prefix reservation is specific to AWS IoT, so I'd think no

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718107, "Topics beginning with $", it is part of the spec. Not really specific to AWS IoT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec mentions that "The Server MUST NOT match Topic Filters starting with a wildcard character (# or +) with Topic Names beginning with a $ character".
The client does not need to perform any special handling for topics starting with "$". Our service (Jobs, Shadow, Provisioning, etc) subscribe to these topics (starting with "$") and the topic matching on receiving incoming PUBLISHes only involves performing an exact character match for the starting "$" in the topic 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does the topic matching happens if there are 2 subscriptions in the client for topics “+/monitor/Clients” and “$SYS/monitor/Clients”?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, have added logic to avoid wildcard match for incoming PUBLISH topic names starting with the "$" character

callbackRecordList[ listIndex ].requiresExactMatch ) == true )
{
/* Invoke the callback associated with the record as the topics match. */
callbackRecordList[ listIndex++ ].callback( packetIdentifier, pPublishInfo );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will result in an infinite loop if matchTopic returns false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Fixed

( strncmp( pTopicFilter, pRecord->pTopicFilter, topicFileterLength ) == 0 ) )
{
recordFound = true;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possible Infinite loop. matchingRecordIndex needs to be incremented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed!

( void ) SubscriptionManager_RemoveCallback( DEMO_APP_1_TOPIC,
strlen( DEMO_APP_1_TOPIC ) );
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic of the above two blocks is repeated twice more, why not make it a function?

LogInfo( ( "Unsubscribing from the MQTT topic %.*s.",
strlen( DEMO_APP_1_TOPIC ),
DEMO_APP_1_TOPIC ) );
returnStatus = unsubscribeFromTopic( &mqttContext, DEMO_APP_1_TOPIC );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the callbacks be removed from the subscription manager?

/* Start with everything at 0. */
( void ) memset( ( void * ) pSubscriptionList, 0x00, sizeof( pSubscriptionList ) );

/* This example subscribes to only one topic and uses QOS2. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment isn't accurate anymore

( void ) memset( ( void * ) pSubscriptionList, 0x00, sizeof( pSubscriptionList ) );

/* This example subscribes to and unsubscribes from only one topic
* and uses QOS2. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here


handleIncomingPublish( pPublishInfo );

/* Send a Publish to the notify topic to signal that the App 1 callback has been invoked! */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

App 2

@sarenameas
Copy link
Contributor

So this will be closed?

@aggarw13 aggarw13 closed this Aug 19, 2020
@aggarw13 aggarw13 deleted the mqtt-demo/connection-sharing branch August 19, 2020 23:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants