diff --git a/src/Broker.php b/src/Broker.php index b0553d43..08aa9208 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -7,12 +7,15 @@ use Kafka\Sasl\Plain; use Kafka\Sasl\Scram; use function array_keys; +use function count; +use function crc32; use function explode; use function in_array; use function serialize; use function shuffle; use function sprintf; use function strpos; +use function trim; class Broker { @@ -281,4 +284,20 @@ private function getSaslMechanismProvider(Config $config): SaslMechanism throw new Exception(sprintf('"%s" is an invalid SASL mechanism', $mechanism)); } + + /** + * @param mixed[] $record + */ + public function getPartitionId(array $record): int + { + $topicInfos = $this->getTopics(); + $topicMeta = $topicInfos[$record['topic']]; + $partNums = array_keys($topicMeta); + if (isset($record['key']) && trim($record['key'])) { + $partId = $partNums[crc32($record['key']) % count($partNums)]; + } else { + $partId = isset($record['partId'], $topicMeta[$record['partId']]) ? $record['partId'] : $partNums[0]; + } + return (int) $partId; + } } diff --git a/src/Producer/Process.php b/src/Producer/Process.php index ccb5d775..4636550e 100644 --- a/src/Producer/Process.php +++ b/src/Producer/Process.php @@ -10,7 +10,6 @@ use Kafka\ProducerConfig; use Kafka\Protocol; use Psr\Log\LoggerAwareTrait; -use function array_keys; use function count; use function explode; use function in_array; @@ -320,10 +319,7 @@ protected function convertRecordSet(array $recordSet): array $this->recordValidator->validate($record, $topics); $topicMeta = $topics[$record['topic']]; - $partNums = array_keys($topicMeta); - shuffle($partNums); - - $partId = ! isset($record['partId'], $topicMeta[$record['partId']]) ? $partNums[0] : $record['partId']; + $partId = $broker->getPartitionId($record); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index 71b1cb06..658a22dd 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -9,7 +9,6 @@ use Kafka\ProducerConfig; use Kafka\Protocol\Protocol; use Psr\Log\LoggerAwareTrait; -use function array_keys; use function count; use function explode; use function json_encode; @@ -161,10 +160,7 @@ protected function convertRecordSet(array $recordSet): array $this->recordValidator->validate($record, $topics); $topicMeta = $topics[$record['topic']]; - $partNums = array_keys($topicMeta); - shuffle($partNums); - - $partId = isset($record['partId'], $topicMeta[$record['partId']]) ? $record['partId'] : $partNums[0]; + $partId = $broker->getPartitionId($record); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index a788969d..61b641be 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -150,4 +150,58 @@ private function getBroker(): Broker { return Broker::getInstance(); } + + /** + * testGetPartitionId + * @access public + */ + public function testGetPartitionId(): void + { + $broker = Broker::getInstance(); + $data = [ + 'brokers' => [ + [ + 'host' => '127.0.0.1', + 'port' => '9092', + 'nodeId' => '0', + ], + [ + 'host' => '127.0.0.1', + 'port' => '9192', + 'nodeId' => '1', + ], + [ + 'host' => '127.0.0.1', + 'port' => '9292', + 'nodeId' => '2', + ], + ], + 'topics' => [ + [ + 'topicName' => 'test', + 'errorCode' => 0, + 'partitions' => [ + [ + 'partitionId' => 0, + 'errorCode' => 0, + 'leader' => 0, + ], + [ + 'partitionId' => 1, + 'errorCode' => 0, + 'leader' => 2, + ], + ], + ], + ], + ]; + $broker->setData($data['topics'], $data['brokers']); + $data = [ + 'partId' => '1', + 'topic' => 'test', + 'value' => 'test message', + ]; + $partId = $broker->getPartitionId($data); + $this->assertEquals('1', $partId); + } }