Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ jobs:
with:
fetch-depth: 0

- name: Setup Java 11 (for keytool)
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: '11'
java-package: 'jre'

- name: Setup PHP ${{ matrix.php-version }}
uses: shivammathur/setup-php@v2
with:
Expand Down Expand Up @@ -140,7 +133,7 @@ jobs:
sed -i "s|$GITHUB_WORKSPACE|/github/workspace|g" phpunit.report-junit.xml

- name: Run SonarQube analysis
uses: sonarsource/sonarcloud-github-action@v1.9
uses: sonarsource/sonarcloud-github-action@v2.0.2
if: matrix.run-sonarqube-analysis
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down
13 changes: 12 additions & 1 deletion src/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Message
{
private MessageType $type;
private int $qualityOfService;
private bool $retained;
private ?int $messageId = null;
private ?string $topic = null;
private ?string $content = null;
Expand All @@ -30,11 +31,13 @@ class Message
*
* @param MessageType $type
* @param int $qualityOfService
* @param bool $retained
*/
public function __construct(MessageType $type, int $qualityOfService = 0)
public function __construct(MessageType $type, int $qualityOfService = 0, bool $retained = false)
{
$this->type = $type;
$this->qualityOfService = $qualityOfService;
$this->retained = $retained;
}

/**
Expand All @@ -53,6 +56,14 @@ public function getQualityOfService(): int
return $this->qualityOfService;
}

/**
* @return bool
*/
public function getRetained(): bool
{
return $this->retained;
}

/**
* @return int|null
*/
Expand Down
20 changes: 15 additions & 5 deletions src/MessageProcessors/Mqtt31MessageProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,9 @@ public function buildPublishCompleteMessage(int $messageId): string
public function parseAndValidateMessage(string $message): ?Message
{
$qualityOfService = 0;
$retained = false;
$data = '';
$result = $this->tryDecodeMessage($message, $command, $qualityOfService, $data);
$result = $this->tryDecodeMessage($message, $command, $qualityOfService, $retained, $data);

if ($result === false) {
throw new InvalidMessageException('The passed message could not be decoded.');
Expand All @@ -440,7 +441,7 @@ public function parseAndValidateMessage(string $message): ?Message
throw new ProtocolViolationException('Unexpected connection acknowledgement.');

case 0x03:
return $this->parseAndValidatePublishMessage($data, $qualityOfService);
return $this->parseAndValidatePublishMessage($data, $qualityOfService, $retained);

case 0x04:
return $this->parseAndValidatePublishAcknowledgementMessage($data);
Expand Down Expand Up @@ -484,10 +485,17 @@ public function parseAndValidateMessage(string $message): ?Message
* @param string $message
* @param int|null $command
* @param int|null $qualityOfService
* @param bool $retained
* @param string|null $data
* @return bool
*/
protected function tryDecodeMessage(string $message, int &$command = null, int &$qualityOfService = null, string &$data = null): bool
protected function tryDecodeMessage(
string $message,
int &$command = null,
int &$qualityOfService = null,
bool &$retained = false,
string &$data = null
): bool
{
// If we received no input, we can return immediately without doing work.
if (strlen($message) === 0) {
Expand All @@ -504,6 +512,7 @@ protected function tryDecodeMessage(string $message, int &$command = null, int &
$byte = $message[0];
$command = (int) (ord($byte) / 16);
$qualityOfService = (ord($byte) & 0x06) >> 1;
$retained = (bool) (ord($byte) & 0x01);

// Read the second byte of a message (remaining length).
// If the continuation bit (8) is set on the length byte, another byte will be read as length.
Expand Down Expand Up @@ -546,15 +555,16 @@ protected function tryDecodeMessage(string $message, int &$command = null, int &
*
* @param string $data
* @param int $qualityOfServiceLevel
* @param bool $retained
* @return Message|null
*/
protected function parseAndValidatePublishMessage(string $data, int $qualityOfServiceLevel): ?Message
protected function parseAndValidatePublishMessage(string $data, int $qualityOfServiceLevel, bool $retained): ?Message
{
$topicLength = (ord($data[0]) << 8) + ord($data[1]);
$topic = substr($data, 2, $topicLength);
$content = substr($data, ($topicLength + 2));

$message = new Message(MessageType::PUBLISH(), $qualityOfServiceLevel);
$message = new Message(MessageType::PUBLISH(), $qualityOfServiceLevel, $retained);

if ($qualityOfServiceLevel > self::QOS_AT_MOST_ONCE) {
if (strlen($content) < 2) {
Expand Down
7 changes: 4 additions & 3 deletions src/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ protected function handleMessage(Message $message): void
$message->getTopic(),
$message->getContent(),
2,
false
$message->getRetained()
);
$this->repository->addPendingIncomingMessage($pendingMessage);
} catch (PendingMessageAlreadyExistsException $e) {
Expand All @@ -772,7 +772,7 @@ protected function handleMessage(Message $message): void
}

// For QoS 0 and QoS 1 we can deliver right away.
$this->deliverPublishedMessage($message->getTopic(), $message->getContent(), $message->getQualityOfService());
$this->deliverPublishedMessage($message->getTopic(), $message->getContent(), $message->getQualityOfService(), $message->getRetained());
return;
}

Expand Down Expand Up @@ -821,7 +821,8 @@ protected function handleMessage(Message $message): void
$this->deliverPublishedMessage(
$pendingMessage->getTopicName(),
$pendingMessage->getMessage(),
$pendingMessage->getQualityOfServiceLevel()
$pendingMessage->getQualityOfServiceLevel(),
$pendingMessage->wantsToBeRetained()
);

$this->repository->removePendingIncomingMessage($message->getMessageId());
Expand Down
186 changes: 171 additions & 15 deletions tests/Feature/PublishSubscribeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,32 @@ class PublishSubscribeTest extends TestCase
{
public function publishSubscribeData(): array
{
return [
[false, 'test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []],
[false, 'test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']],
[false, 'test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']],
[false, 'test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']],
[false, 'test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']],
[false, 'test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
[false, 'test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
[true, 'test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []],
[true, 'test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']],
[true, 'test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']],
[true, 'test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']],
[true, 'test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']],
[true, 'test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
[true, 'test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
$data = [
[false, 'foo/bar/baz', 'foo/bar/baz', 'hello world', []],
[false, 'foo/bar/+', 'foo/bar/baz', 'hello world', ['baz']],
[false, 'foo/+/baz', 'foo/bar/baz', 'hello world', ['bar']],
[false, 'foo/#', 'foo/bar/baz', 'hello world', ['bar/baz']],
[false, 'foo/+/bar/#', 'foo/my/bar/baz', 'hello world', ['my', 'baz']],
[false, 'foo/+/bar/#', 'foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
[false, 'foo/bar/baz', 'foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
[true, 'foo/bar/baz', 'foo/bar/baz', 'hello world', []],
[true, 'foo/bar/+', 'foo/bar/baz', 'hello world', ['baz']],
[true, 'foo/+/baz', 'foo/bar/baz', 'hello world', ['bar']],
[true, 'foo/#', 'foo/bar/baz', 'hello world', ['bar/baz']],
[true, 'foo/+/bar/#', 'foo/my/bar/baz', 'hello world', ['my', 'baz']],
[true, 'foo/+/bar/#', 'foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
[true, 'foo/bar/baz', 'foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
];

// Because our tests are run against a real MQTT broker and some messages are retained,
// we need to prevent false-positives by giving each test case its own 'test space' using a random prefix.
for ($i = 0; $i < count($data); $i++) {
$prefix = 'test/' . uniqid('', true) . '/';
$data[$i][1] = $prefix . $data[$i][1];
$data[$i][2] = $prefix . $data[$i][2];
}

return $data;
}

/**
Expand Down Expand Up @@ -84,6 +94,57 @@ function (string $topic, string $message, bool $retained, array $wildcards) use
$subscriber->disconnect();
}

/**
* @dataProvider publishSubscribeData
*/
public function test_publishing_and_subscribing_using_quality_of_service_0_with_message_retention_works_as_intended(
bool $useBlockingSocket,
string $subscriptionTopicFilter,
string $publishTopic,
string $publishMessage,
array $matchedTopicWildcards
): void
{
// We publish a message from the first client, which disconnects before the other client even subscribes.
$publisher = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'publisher');
$publisher->connect(null, true);

$publisher->publish($publishTopic, $publishMessage, 0, true);

$publisher->disconnect();

// Because we need to make sure the message reached the broker, we delay the execution for a short period (100ms) intentionally.
// With higher QoS, this is replaced by awaiting delivery of the message.
usleep(100_000);

// We connect and subscribe to a topic using the second client.
$connectionSettings = (new ConnectionSettings())
->useBlockingSocket($useBlockingSocket);

$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
$subscriber->connect($connectionSettings, true);

$subscriber->subscribe(
$subscriptionTopicFilter,
function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $publishTopic, $publishMessage, $matchedTopicWildcards) {
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
$this->assertEquals($publishTopic, $topic);
$this->assertEquals($publishMessage, $message);
$this->assertTrue($retained);
$this->assertEquals($matchedTopicWildcards, $wildcards);

$subscriber->interrupt(); // This allows us to exit the test as soon as possible.
},
0
);

// Then we loop on the subscriber to (hopefully) receive the published message.
$subscriber->loop(true);

// Finally, we disconnect for a graceful shutdown on the broker side.
$subscriber->disconnect();
}

/**
* @dataProvider publishSubscribeData
*/
Expand Down Expand Up @@ -130,6 +191,54 @@ function (string $topic, string $message, bool $retained, array $wildcards) use
$subscriber->disconnect();
}

/**
* @dataProvider publishSubscribeData
*/
public function test_publishing_and_subscribing_using_quality_of_service_1_with_message_retention_works_as_intended(
bool $useBlockingSocket,
string $subscriptionTopicFilter,
string $publishTopic,
string $publishMessage,
array $matchedTopicWildcards
): void
{
// We publish a message from the first client, which disconnects before the other client even subscribes.
$publisher = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'publisher');
$publisher->connect(null, true);

$publisher->publish($publishTopic, $publishMessage, 1, true);
$publisher->loop(true, true);

$publisher->disconnect();

// We connect and subscribe to a topic using the second client.
$connectionSettings = (new ConnectionSettings())
->useBlockingSocket($useBlockingSocket);

$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
$subscriber->connect($connectionSettings, true);

$subscriber->subscribe(
$subscriptionTopicFilter,
function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $publishTopic, $publishMessage, $matchedTopicWildcards) {
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
$this->assertEquals($publishTopic, $topic);
$this->assertEquals($publishMessage, $message);
$this->assertTrue($retained);
$this->assertEquals($matchedTopicWildcards, $wildcards);

$subscriber->interrupt(); // This allows us to exit the test as soon as possible.
},
1
);

// Then we loop on the subscriber to (hopefully) receive the published message.
$subscriber->loop(true);

// Finally, we disconnect for a graceful shutdown on the broker side.
$subscriber->disconnect();
}

/**
* @dataProvider publishSubscribeData
*/
Expand Down Expand Up @@ -176,6 +285,53 @@ public function test_publishing_and_subscribing_using_quality_of_service_2_works
$subscriber->disconnect();
}

/**
* @dataProvider publishSubscribeData
*/
public function test_publishing_and_subscribing_using_quality_of_service_2_with_message_retention_works_as_intended(
bool $useBlockingSocket,
string $subscriptionTopicFilter,
string $publishTopic,
string $publishMessage,
array $matchedTopicWildcards
): void
{
// We publish a message from the first client. The loop is called until all QoS 2 handshakes are done.
$publisher = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'publisher');
$publisher->connect(null, true);

$publisher->publish($publishTopic, $publishMessage, 2, true);
$publisher->loop(true, true);

$publisher->disconnect();

// We connect and subscribe to a topic using the second client.
$connectionSettings = (new ConnectionSettings())
->useBlockingSocket($useBlockingSocket);

$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
$subscriber->connect($connectionSettings, true);

$subscription = function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $subscriptionTopicFilter, $publishTopic, $publishMessage, $matchedTopicWildcards) {
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
$this->assertEquals($publishTopic, $topic);
$this->assertEquals($publishMessage, $message);
$this->assertTrue($retained);
$this->assertEquals($matchedTopicWildcards, $wildcards);

$subscriber->unsubscribe($subscriptionTopicFilter);
$subscriber->interrupt(); // This allows us to exit the test as soon as possible.
};

$subscriber->subscribe($subscriptionTopicFilter, $subscription, 2);

// Then we loop on the subscriber to (hopefully) receive the published message until the receive handshake is done.
$subscriber->loop(true, true);

// Finally, we disconnect for a graceful shutdown on the broker side.
$subscriber->disconnect();
}

public function test_unsubscribe_stops_receiving_messages_on_topic(): void
{
// We connect and subscribe to a topic using the first client.
Expand Down