Skip to content

Commit b8cfbe1

Browse files
authored
Merge branch '2.4-develop' into 2.4-develop
2 parents 854df7e + f3e8f25 commit b8cfbe1

File tree

8 files changed

+239
-50
lines changed

8 files changed

+239
-50
lines changed

app/code/Magento/MessageQueue/Console/StartConsumerCommand.php

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class StartConsumerCommand extends Command
2323
const OPTION_BATCH_SIZE = 'batch-size';
2424
const OPTION_AREACODE = 'area-code';
2525
const OPTION_SINGLE_THREAD = 'single-thread';
26+
const OPTION_MULTI_PROCESS = 'multi-process';
2627
const PID_FILE_PATH = 'pid-file-path';
2728
const COMMAND_QUEUE_CONSUMERS_START = 'queue:consumers:start';
2829

@@ -42,9 +43,6 @@ class StartConsumerCommand extends Command
4243
private $lockManager;
4344

4445
/**
45-
* StartConsumerCommand constructor.
46-
* {@inheritdoc}
47-
*
4846
* @param \Magento\Framework\App\State $appState
4947
* @param ConsumerFactory $consumerFactory
5048
* @param string $name
@@ -78,6 +76,12 @@ protected function execute(InputInterface $input, OutputInterface $output)
7876
}
7977

8078
$singleThread = $input->getOption(self::OPTION_SINGLE_THREAD);
79+
$multiProcess = $input->getOption(self::OPTION_MULTI_PROCESS);
80+
81+
if ($multiProcess && !$this->lockManager->lock(md5($consumerName . '-' . $multiProcess),0)) { //phpcs:ignore
82+
$output->writeln('<error>Consumer with the same name is running</error>');
83+
return \Magento\Framework\Console\Cli::RETURN_FAILURE;
84+
}
8185

8286
if ($singleThread && !$this->lockManager->lock(md5($consumerName),0)) { //phpcs:ignore
8387
$output->writeln('<error>Consumer with the same name is running</error>');
@@ -88,9 +92,13 @@ protected function execute(InputInterface $input, OutputInterface $output)
8892

8993
$consumer = $this->consumerFactory->get($consumerName, $batchSize);
9094
$consumer->process($numberOfMessages);
95+
9196
if ($singleThread) {
9297
$this->lockManager->unlock(md5($consumerName)); //phpcs:ignore
9398
}
99+
if ($multiProcess) {
100+
$this->lockManager->unlock(md5($consumerName . '-' . $multiProcess)); //phpcs:ignore
101+
}
94102

95103
return \Magento\Framework\Console\Cli::RETURN_SUCCESS;
96104
}
@@ -133,6 +141,12 @@ protected function configure()
133141
InputOption::VALUE_NONE,
134142
'This option prevents running multiple copies of one consumer simultaneously.'
135143
);
144+
$this->addOption(
145+
self::OPTION_MULTI_PROCESS,
146+
null,
147+
InputOption::VALUE_OPTIONAL,
148+
'The number of processes per consumer.'
149+
);
136150
$this->addOption(
137151
self::PID_FILE_PATH,
138152
null,
@@ -161,11 +175,15 @@ protected function configure()
161175
162176
To do not run multiple copies of one consumer simultaneously:
163177
164-
<comment>%command.full_name% someConsumer --single-thread'</comment>
178+
<comment>%command.full_name% someConsumer --single-thread</comment>
165179
166180
To save PID enter path (This option is deprecated, use --single-thread instead):
167181
168182
<comment>%command.full_name% someConsumer --pid-file-path='/var/someConsumer.pid'</comment>
183+
184+
To define the number of processes per consumer:
185+
186+
<comment>%command.full_name% someConsumer --multi-process=4</comment>
169187
HELP
170188
);
171189
parent::configure();

app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ class ConsumersRunner
6060
private $logger;
6161

6262
/**
63-
* Lock Manager
64-
*
6563
* @var LockManagerInterface
6664
*/
6765
private $lockManager;
@@ -107,10 +105,13 @@ public function __construct(
107105

108106
/**
109107
* Runs consumers processes
108+
*
109+
* @SuppressWarnings(PHPMD.CyclomaticComplexity)
110110
*/
111-
public function run()
111+
public function run(): void
112112
{
113113
$runByCron = $this->deploymentConfig->get('cron_consumers_runner/cron_run', true);
114+
$multipleProcesses = $this->deploymentConfig->get('cron_consumers_runner/multiple_processes', []);
114115

115116
if (!$runByCron) {
116117
return;
@@ -125,19 +126,43 @@ public function run()
125126
continue;
126127
}
127128

128-
$arguments = [
129-
$consumer->getName(),
130-
'--single-thread'
131-
];
132-
133-
if ($maxMessages) {
134-
$arguments[] = '--max-messages=' . min($consumer->getMaxMessages() ?? $maxMessages, $maxMessages);
129+
if (array_key_exists($consumer->getName(), $multipleProcesses)) {
130+
$numberOfProcesses = $multipleProcesses[$consumer->getName()];
131+
132+
for ($i = 1; $i <= $numberOfProcesses; $i++) {
133+
if ($this->lockManager->isLocked(md5($consumer->getName() . '-' . $i))) { //phpcs:ignore
134+
continue;
135+
}
136+
$arguments = [
137+
$consumer->getName(),
138+
'--multi-process=' . $i
139+
];
140+
141+
if ($maxMessages) {
142+
$arguments[] =
143+
'--max-messages=' . min($consumer->getMaxMessages() ?? $maxMessages, $maxMessages);
144+
}
145+
146+
$command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s'
147+
. ($maxMessages ? ' %s' : '');
148+
149+
$this->shellBackground->execute($command, $arguments);
150+
}
151+
} else if (!$this->lockManager->isLocked(md5($consumer->getName()))) { //phpcs:ignore
152+
$arguments = [
153+
$consumer->getName(),
154+
'--single-thread'
155+
];
156+
157+
if ($maxMessages) {
158+
$arguments[] = '--max-messages=' . min($consumer->getMaxMessages() ?? $maxMessages, $maxMessages);
159+
}
160+
161+
$command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s'
162+
. ($maxMessages ? ' %s' : '');
163+
164+
$this->shellBackground->execute($command, $arguments);
135165
}
136-
137-
$command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s'
138-
. ($maxMessages ? ' %s' : '');
139-
140-
$this->shellBackground->execute($command, $arguments);
141166
}
142167
}
143168

@@ -157,10 +182,6 @@ private function canBeRun(ConsumerConfigItemInterface $consumerConfig, array $al
157182
return false;
158183
}
159184

160-
if ($this->lockManager->isLocked(md5($consumerName))) { //phpcs:ignore
161-
return false;
162-
}
163-
164185
$connectionName = $consumerConfig->getConnection();
165186
try {
166187
$this->mqConnectionTypeResolver->getConnectionType($connectionName);

app/code/Magento/MessageQueue/Test/Unit/Console/StartConsumerCommandTest.php

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,24 +90,26 @@ protected function setUp(): void
9090
*
9191
* @param string|null $pidFilePath
9292
* @param bool $singleThread
93+
* @param int|null $multiProcess
9394
* @param int $lockExpects
94-
* @param int $isLockedExpects
9595
* @param bool $isLocked
9696
* @param int $unlockExpects
9797
* @param int $runProcessExpects
9898
* @param int $expectedReturn
9999
* @return void
100+
* @throws \Exception
100101
* @dataProvider executeDataProvider
101102
*/
102103
public function testExecute(
103-
$pidFilePath,
104-
$singleThread,
105-
$lockExpects,
106-
$isLocked,
107-
$unlockExpects,
108-
$runProcessExpects,
109-
$expectedReturn
110-
) {
104+
?string $pidFilePath,
105+
bool $singleThread,
106+
?int $multiProcess,
107+
int $lockExpects,
108+
bool $isLocked,
109+
int $unlockExpects,
110+
int $runProcessExpects,
111+
int $expectedReturn
112+
): void {
111113
$areaCode = 'area_code';
112114
$numberOfMessages = 10;
113115
$batchSize = null;
@@ -121,19 +123,21 @@ public function testExecute(
121123
$input->expects($this->once())->method('getArgument')
122124
->with(StartConsumerCommand::ARGUMENT_CONSUMER)
123125
->willReturn($consumerName);
124-
$input->expects($this->exactly(5))->method('getOption')
126+
$input->expects($this->exactly(6))->method('getOption')
125127
->withConsecutive(
126128
[StartConsumerCommand::OPTION_NUMBER_OF_MESSAGES],
127129
[StartConsumerCommand::OPTION_BATCH_SIZE],
128130
[StartConsumerCommand::OPTION_AREACODE],
129131
[StartConsumerCommand::PID_FILE_PATH],
130-
[StartConsumerCommand::OPTION_SINGLE_THREAD]
132+
[StartConsumerCommand::OPTION_SINGLE_THREAD],
133+
[StartConsumerCommand::OPTION_MULTI_PROCESS]
131134
)->willReturnOnConsecutiveCalls(
132135
$numberOfMessages,
133136
$batchSize,
134137
$areaCode,
135138
$pidFilePath,
136-
$singleThread
139+
$singleThread,
140+
$multiProcess
137141
);
138142
$this->appState->expects($this->exactly($runProcessExpects))->method('setAreaCode')->with($areaCode);
139143
$consumer = $this->getMockBuilder(ConsumerInterface::class)
@@ -143,6 +147,10 @@ public function testExecute(
143147
->method('get')->with($consumerName, $batchSize)->willReturn($consumer);
144148
$consumer->expects($this->exactly($runProcessExpects))->method('process')->with($numberOfMessages);
145149

150+
if ($multiProcess !== null) {
151+
$consumerName .= '-' . $multiProcess;
152+
}
153+
146154
$this->lockManagerMock->expects($this->exactly($lockExpects))
147155
->method('lock')
148156
->with(md5($consumerName))//phpcs:ignore
@@ -161,12 +169,13 @@ public function testExecute(
161169
/**
162170
* @return array
163171
*/
164-
public function executeDataProvider()
172+
public function executeDataProvider(): array
165173
{
166174
return [
167175
[
168176
'pidFilePath' => null,
169177
'singleThread' => false,
178+
'multiProcess' => null,
170179
'lockExpects' => 0,
171180
'isLocked' => true,
172181
'unlockExpects' => 0,
@@ -176,6 +185,7 @@ public function executeDataProvider()
176185
[
177186
'pidFilePath' => '/var/consumer.pid',
178187
'singleThread' => true,
188+
'multiProcess' => null,
179189
'lockExpects' => 1,
180190
'isLocked' => true,
181191
'unlockExpects' => 1,
@@ -185,6 +195,27 @@ public function executeDataProvider()
185195
[
186196
'pidFilePath' => '/var/consumer.pid',
187197
'singleThread' => true,
198+
'multiProcess' => null,
199+
'lockExpects' => 1,
200+
'isLocked' => false,
201+
'unlockExpects' => 0,
202+
'runProcessExpects' => 0,
203+
'expectedReturn' => Cli::RETURN_FAILURE,
204+
],
205+
[
206+
'pidFilePath' => null,
207+
'singleThread' => false,
208+
'multiProcess' => 3,
209+
'lockExpects' => 1,
210+
'isLocked' => true,
211+
'unlockExpects' => 1,
212+
'runProcessExpects' => 1,
213+
'expectedReturn' => Cli::RETURN_SUCCESS,
214+
],
215+
[
216+
'pidFilePath' => null,
217+
'singleThread' => false,
218+
'multiProcess' => 3,
188219
'lockExpects' => 1,
189220
'isLocked' => false,
190221
'unlockExpects' => 0,

0 commit comments

Comments
 (0)