Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ Full API documentation is available here: http://pubsubclient.knolleary.net

- It can only publish QoS 0 messages. It can subscribe at QoS 0 or QoS 1.
- The maximum message size, including header, is **128 bytes** by default. This
is configurable via `MQTT_MAX_PACKET_SIZE` in `PubSubClient.h`.
is configurable via `MQTT_MAX_PACKET_SIZE` in `PubSubClient.h` or at runtime
using PubSubClient::setBufferSize().
- The keepalive interval is set to 15 seconds by default. This is configurable
via `MQTT_KEEPALIVE` in `PubSubClient.h`.
- The client uses MQTT 3.1.1 by default. It can be changed to use MQTT 3.1 by
Expand Down
6 changes: 3 additions & 3 deletions examples/mqtt_auth/mqtt_auth.ino
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ void setup()
{
Ethernet.begin(mac, ip);
// Note - the default maximum packet size is 128 bytes. If the
// combined length of clientId, username and password exceed this,
// you will need to increase the value of MQTT_MAX_PACKET_SIZE in
// PubSubClient.h
// combined length of clientId, username and password exceed this use the
// following to increase the buffer size:
// client.setBufferSize(255);

if (client.connect("arduinoClient", "testuser", "testpass")) {
client.publish("outTopic","hello world");
Expand Down
52 changes: 47 additions & 5 deletions src/PubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,93 +12,125 @@ PubSubClient::PubSubClient() {
this->_client = NULL;
this->stream = NULL;
setCallback(NULL);
this->buffer_size = MQTT_MAX_PACKET_SIZE;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woudn't it be better to move the buffer allocation to a support function, rather than duplicate the same code in all constructors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree in principal but
i was trying to match the existing code style. (ie, explicitly calling everything in each constructor.)

@knolleary any opinion?

this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}

PubSubClient::PubSubClient(Client& client) {
this->_state = MQTT_DISCONNECTED;
setClient(client);
this->stream = NULL;
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}

PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
this->_state = MQTT_DISCONNECTED;
setServer(addr, port);
setClient(client);
this->stream = NULL;
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED;
setServer(addr,port);
setClient(client);
setStream(stream);
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED;
setServer(addr, port);
setCallback(callback);
setClient(client);
this->stream = NULL;
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED;
setServer(addr,port);
setCallback(callback);
setClient(client);
setStream(stream);
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}

PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
this->_state = MQTT_DISCONNECTED;
setServer(ip, port);
setClient(client);
this->stream = NULL;
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED;
setServer(ip,port);
setClient(client);
setStream(stream);
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED;
setServer(ip, port);
setCallback(callback);
setClient(client);
this->stream = NULL;
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED;
setServer(ip,port);
setCallback(callback);
setClient(client);
setStream(stream);
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}

PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
this->_state = MQTT_DISCONNECTED;
setServer(domain,port);
setClient(client);
this->stream = NULL;
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED;
setServer(domain,port);
setClient(client);
setStream(stream);
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED;
setServer(domain,port);
setCallback(callback);
setClient(client);
this->stream = NULL;
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED;
setServer(domain,port);
setCallback(callback);
setClient(client);
setStream(stream);
this->buffer_size = MQTT_MAX_PACKET_SIZE;
this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE);
}

PubSubClient::~PubSubClient() {
free(this->buffer);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should check if this->buffer is NULL or not since setBufferSize() can set it to NULL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. according to the spec std::free() should treat a null pointer as a no-op.
from http://www.open-std.org/JTC1/SC22/wg14/www/docs/n1124.pdf :

The free function causes the space pointed to by ptr to be deallocated, that is, made available for further allocation. If ptr is a null pointer, no action occurs.

(it makes sense because std::malloc returns null pointer on failure.)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sweet, I haven't really coded C/C++ in ages and back then it was a no-no since freeing a null pointer would cause some environments to go belly up. :-)

}

boolean PubSubClient::connect(const char *id) {
Expand Down Expand Up @@ -266,13 +298,13 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
this->stream->write(digit);
}
}
if (len < MQTT_MAX_PACKET_SIZE) {
if (len < this->buffer_size) {
buffer[len] = digit;
}
len++;
}

if (!this->stream && len > MQTT_MAX_PACKET_SIZE) {
if (!this->stream && len > this->buffer_size) {
len = 0; // This will cause the packet to be ignored.
}

Expand Down Expand Up @@ -356,7 +388,7 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne

boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
if (connected()) {
if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) {
if (this->buffer_size < 5 + 2+strlen(topic) + plength) {
// Too long
return false;
}
Expand Down Expand Up @@ -471,7 +503,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
if (qos < 0 || qos > 1) {
return false;
}
if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
if (this->buffer_size < 9 + strlen(topic)) {
// Too long
return false;
}
Expand All @@ -492,7 +524,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
}

boolean PubSubClient::unsubscribe(const char* topic) {
if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
if (this->buffer_size < 9 + strlen(topic)) {
// Too long
return false;
}
Expand Down Expand Up @@ -586,3 +618,13 @@ PubSubClient& PubSubClient::setStream(Stream& stream){
int PubSubClient::state() {
return this->_state;
}

boolean PubSubClient::setBufferSize(uint16_t size) {
this->buffer = (uint8_t*)realloc(this->buffer, size);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use a temp variable. If realloc fails it will return a NULL pointer without freeing the original memory. Thus, if NULL is returned this method should not make any change to this->buffer since it is still valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. this makes sense.

@knolleary is this pull request likely to ever be merged? is it worth me doing this?

this->buffer_size = size;
return (this->buffer == NULL);
}

uint16_t PubSubClient::getBufferSize() {
return this->buffer_size;
}
9 changes: 7 additions & 2 deletions src/PubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#define MQTT_VERSION MQTT_VERSION_3_1_1
#endif

// MQTT_MAX_PACKET_SIZE : Maximum packet size
// MQTT_MAX_PACKET_SIZE : Maximum packet size. Override with setBufferSize().
#ifndef MQTT_MAX_PACKET_SIZE
#define MQTT_MAX_PACKET_SIZE 128
#endif
Expand Down Expand Up @@ -83,7 +83,8 @@
class PubSubClient {
private:
Client* _client;
uint8_t buffer[MQTT_MAX_PACKET_SIZE];
uint8_t* buffer;
uint16_t buffer_size;
uint16_t nextMsgId;
unsigned long lastOutActivity;
unsigned long lastInActivity;
Expand Down Expand Up @@ -115,6 +116,8 @@ class PubSubClient {
PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client);
PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&);

~PubSubClient();

PubSubClient& setServer(IPAddress ip, uint16_t port);
PubSubClient& setServer(uint8_t * ip, uint16_t port);
PubSubClient& setServer(const char * domain, uint16_t port);
Expand All @@ -138,6 +141,8 @@ class PubSubClient {
boolean loop();
boolean connected();
int state();
boolean setBufferSize(uint16_t size);
uint16_t getBufferSize();
};


Expand Down
38 changes: 38 additions & 0 deletions tests/src/receive_spec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,43 @@ int test_receive_oversized_message() {
END_IT
}

int test_resize_buffer() {
IT("receives a message larger than the default maximum");
reset_callback();

ShimClient shimClient;
shimClient.setAllowConnect(true);

byte connack[] = { 0x20, 0x02, 0x00, 0x00 };
shimClient.respond(connack,4);

PubSubClient client(server, 1883, callback, shimClient);
int rc = client.connect((char*)"client_test1");
IS_TRUE(rc);

int length = MQTT_MAX_PACKET_SIZE+1;
client.setBufferSize(length);
byte publish[] = {0x30,length-2,0x0,0x5,0x74,0x6f,0x70,0x69,0x63,0x70,0x61,0x79,0x6c,0x6f,0x61,0x64};
byte bigPublish[length];
memset(bigPublish,'A',length);
bigPublish[length] = 'B';
memcpy(bigPublish,publish,16);
shimClient.respond(bigPublish,length);

rc = client.loop();

IS_TRUE(rc);

IS_TRUE(callback_called);
IS_TRUE(strcmp(lastTopic,"topic")==0);
IS_TRUE(lastLength == length-9);
IS_TRUE(memcmp(lastPayload,bigPublish+9,lastLength)==0);

IS_FALSE(shimClient.error());

END_IT
}

int test_receive_oversized_stream_message() {
IT("drops an oversized message");
reset_callback();
Expand Down Expand Up @@ -242,6 +279,7 @@ int main()
test_receive_stream();
test_receive_max_sized_message();
test_receive_oversized_message();
test_resize_buffer();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the merge request contains functionality to dynamically change the buffer size it would be suitable to have tests that verify that this works well. E g:

  • What happens if setBufferSize() fails to allocate the memory?
  • If the buffer size is shrunk will it fail (appropriately) to receive a message that was within the prior limits?
  • And the reverse, if it fails to receive a message due to size will increasing the buffer size accordingly help?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, all make sense. Let's see if this is likely to be merged first.

test_receive_oversized_stream_message();
test_receive_qos1();

Expand Down