diff --git a/app/code/Magento/Cron/Model/DeadlockRetrier.php b/app/code/Magento/Cron/Model/DeadlockRetrier.php index 15497910a089b..ab180e93e0ca3 100644 --- a/app/code/Magento/Cron/Model/DeadlockRetrier.php +++ b/app/code/Magento/Cron/Model/DeadlockRetrier.php @@ -17,6 +17,20 @@ */ class DeadlockRetrier implements DeadlockRetrierInterface { + /** + * @var \Psr\Log\LoggerInterface + */ + private $logger; + + /** + * @param \Psr\Log\LoggerInterface $logger + */ + public function __construct( + \Psr\Log\LoggerInterface $logger + ) { + $this->logger = $logger; + } + /** * @inheritdoc */ @@ -30,6 +44,7 @@ public function execute(callable $callback, AdapterInterface $connection) try { return $callback(); } catch (DeadlockException $e) { + $this->logger->warning(sprintf("Deadlock detected in cron: %s", $e->getMessage())); continue; } } diff --git a/app/code/Magento/Cron/Model/ResourceModel/Schedule.php b/app/code/Magento/Cron/Model/ResourceModel/Schedule.php index 25ebaec5582c9..120e0ce6432c5 100644 --- a/app/code/Magento/Cron/Model/ResourceModel/Schedule.php +++ b/app/code/Magento/Cron/Model/ResourceModel/Schedule.php @@ -65,31 +65,47 @@ public function trySetJobStatusAtomic($scheduleId, $newStatus, $currentStatus) public function trySetJobUniqueStatusAtomic($scheduleId, $newStatus, $currentStatus) { $connection = $this->getConnection(); + $connection->beginTransaction(); // this condition added to avoid cron jobs locking after incorrect termination of running job $match = $connection->quoteInto( 'existing.job_code = current.job_code ' . - 'AND (existing.executed_at > UTC_TIMESTAMP() - INTERVAL 1 DAY OR existing.executed_at IS NULL) ' . - 'AND existing.status = ?', + 'AND existing.status = ? ' . + 'AND (existing.executed_at > UTC_TIMESTAMP() - INTERVAL 1 DAY OR existing.executed_at IS NULL)', $newStatus ); + // Select and lock all related schedules - this prevents deadlock in case cron overlaps and two jobs of + // the same code attempt to lock at the same time, and force them to serialize $selectIfUnlocked = $connection->select() + ->from( + ['current' => $this->getTable('cron_schedule')], + [] + ) ->joinLeft( ['existing' => $this->getTable('cron_schedule')], $match, - ['status' => new \Zend_Db_Expr($connection->quote($newStatus))] + ['existing.schedule_id'] ) ->where('current.schedule_id = ?', $scheduleId) ->where('current.status = ?', $currentStatus) - ->where('existing.schedule_id IS NULL'); - - $update = $connection->updateFromSelect($selectIfUnlocked, ['current' => $this->getTable('cron_schedule')]); - $result = $connection->query($update)->rowCount(); + ->forUpdate(true); - if ($result == 1) { - return true; + $scheduleId = $connection->fetchOne($selectIfUnlocked); + if (!empty($scheduleId)) { + // Existing running schedule found + $connection->commit(); + return false; } - return false; + + // Mark our schedule as running + $connection->update( + $this->getTable('cron_schedule'), + ['status' => new \Zend_Db_Expr($connection->quote($newStatus))], + ['schedule_id = ?' => $scheduleId] + ); + + $connection->commit(); + return true; } } diff --git a/app/code/Magento/Cron/Observer/ProcessCronQueueObserver.php b/app/code/Magento/Cron/Observer/ProcessCronQueueObserver.php index acffba02eb461..0f266b5d62d83 100644 --- a/app/code/Magento/Cron/Observer/ProcessCronQueueObserver.php +++ b/app/code/Magento/Cron/Observer/ProcessCronQueueObserver.php @@ -9,6 +9,7 @@ */ namespace Magento\Cron\Observer; +use Magento\Cron\Model\ResourceModel\Schedule\Collection as ScheduleCollection; use Magento\Cron\Model\Schedule; use Magento\Framework\App\State; use Magento\Framework\Console\Cli; @@ -83,7 +84,7 @@ class ProcessCronQueueObserver implements ObserverInterface const MAX_RETRIES = 5; /** - * @var \Magento\Cron\Model\ResourceModel\Schedule\Collection + * @var ScheduleCollection */ protected $_pendingSchedules; @@ -278,12 +279,12 @@ function ($groupId) use ($currentTime) { * * It should be taken by standalone (child) process, not by the parent process. * - * @param int $groupId + * @param string $groupId * @param callable $callback * * @return void */ - private function lockGroup($groupId, callable $callback) + private function lockGroup(string $groupId, callable $callback): void { if (!$this->lockManager->lock(self::LOCK_PREFIX . $groupId, self::LOCK_TIMEOUT)) { $this->logger->warning( @@ -399,7 +400,7 @@ function () use ($schedule) { * @param string $jobName * @return void */ - private function startProfiling(string $jobName = '') + private function startProfiling(string $jobName = ''): void { $this->statProfiler->clear(); $this->statProfiler->start( @@ -416,7 +417,7 @@ private function startProfiling(string $jobName = '') * @param string $jobName * @return void */ - private function stopProfiling(string $jobName = '') + private function stopProfiling(string $jobName = ''): void { $this->statProfiler->stop( sprintf(self::CRON_TIMERID, $jobName), @@ -445,9 +446,9 @@ private function getProfilingStat(string $jobName): string * Return job collection from data base with status 'pending'. * * @param string $groupId - * @return \Magento\Cron\Model\ResourceModel\Schedule\Collection + * @return ScheduleCollection */ - private function getPendingSchedules($groupId) + private function getPendingSchedules(string $groupId): ScheduleCollection { $jobs = $this->_config->getJobs(); $pendingJobs = $this->_scheduleFactory->create()->getCollection(); @@ -462,7 +463,7 @@ private function getPendingSchedules($groupId) * @param string $groupId * @return $this */ - private function generateSchedules($groupId) + private function generateSchedules(string $groupId): self { /** * check if schedule generation is needed @@ -533,13 +534,13 @@ protected function _generateJobs($jobs, $exists, $groupId) * @param int $currentTime * @return void */ - private function cleanupJobs($groupId, $currentTime) + private function cleanupJobs(string $groupId, int $currentTime): void { // check if history cleanup is needed $lastCleanup = (int)$this->_cache->load(self::CACHE_KEY_LAST_HISTORY_CLEANUP_AT . $groupId); $historyCleanUp = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_CLEANUP_EVERY); if ($lastCleanup > $this->dateTime->gmtTimestamp() - $historyCleanUp * self::SECONDS_IN_MINUTE) { - return $this; + return; } // save time history cleanup was ran with no expiration $this->_cache->save( @@ -550,6 +551,7 @@ private function cleanupJobs($groupId, $currentTime) ); $this->cleanupDisabledJobs($groupId); + $this->cleanupRunningJobs($groupId); $historySuccess = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_SUCCESS); $historyFailure = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_FAILURE); @@ -673,7 +675,7 @@ protected function getScheduleTimeInterval($groupId) * @param string $groupId * @return void */ - private function cleanupDisabledJobs($groupId) + private function cleanupDisabledJobs(string $groupId): void { $jobs = $this->_config->getJobs(); $jobsToCleanup = []; @@ -696,6 +698,33 @@ private function cleanupDisabledJobs($groupId) } } + /** + * Cleanup jobs that were left in a running state due to an unexpected stop + * + * @param string $groupId + * @return void + */ + private function cleanupRunningJobs(string $groupId): void + { + $scheduleResource = $this->_scheduleFactory->create()->getResource(); + $connection = $scheduleResource->getConnection(); + + $jobs = $this->_config->getJobs(); + + $connection->update( + $scheduleResource->getTable('cron_schedule'), + [ + 'status' => \Magento\Cron\Model\Schedule::STATUS_ERROR, + 'messages' => 'Time out' + ], + [ + $connection->quoteInto('status = ?', \Magento\Cron\Model\Schedule::STATUS_RUNNING), + $connection->quoteInto('job_code IN (?)', array_keys($jobs[$groupId])), + 'scheduled_at < UTC_TIMESTAMP() - INTERVAL 1 DAY' + ] + ); + } + /** * Get cron expression of cron job. * @@ -773,13 +802,13 @@ private function isGroupInFilter($groupId): bool * @param array $jobsRoot * @param int $currentTime */ - private function processPendingJobs($groupId, $jobsRoot, $currentTime) + private function processPendingJobs(string $groupId, array $jobsRoot, int $currentTime): void { - $procesedJobs = []; + $processedJobs = []; $pendingJobs = $this->getPendingSchedules($groupId); /** @var Schedule $schedule */ foreach ($pendingJobs as $schedule) { - if (isset($procesedJobs[$schedule->getJobCode()])) { + if (isset($processedJobs[$schedule->getJobCode()])) { // process only on job per run continue; } @@ -796,7 +825,7 @@ private function processPendingJobs($groupId, $jobsRoot, $currentTime) $this->tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId); if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) { - $procesedJobs[$schedule->getJobCode()] = true; + $processedJobs[$schedule->getJobCode()] = true; } $this->retrier->execute( @@ -821,7 +850,7 @@ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, { // use sha1 to limit length // phpcs:ignore Magento2.Security.InsecureFunction - $lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode()); + $lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode()); try { for ($retries = self::MAX_RETRIES; $retries > 0; $retries--) { diff --git a/app/code/Magento/Cron/Test/Unit/Model/DeadlockRetrierTest.php b/app/code/Magento/Cron/Test/Unit/Model/DeadlockRetrierTest.php index 60eaa091a761f..36e4537383aa6 100644 --- a/app/code/Magento/Cron/Test/Unit/Model/DeadlockRetrierTest.php +++ b/app/code/Magento/Cron/Test/Unit/Model/DeadlockRetrierTest.php @@ -13,6 +13,7 @@ use Magento\Framework\DB\Adapter\AdapterInterface; use Magento\Framework\DB\Adapter\DeadlockException; use PHPUnit\Framework\MockObject\MockObject; +use Psr\Log\LoggerInterface; class DeadlockRetrierTest extends \PHPUnit\Framework\TestCase { @@ -27,6 +28,11 @@ class DeadlockRetrierTest extends \PHPUnit\Framework\TestCase */ private $adapterMock; + /** + * @var LoggerInterface|MockObject + */ + private $loggerMock; + /** * @var AbstractModel|MockObject */ @@ -38,8 +44,9 @@ class DeadlockRetrierTest extends \PHPUnit\Framework\TestCase protected function setUp(): void { $this->adapterMock = $this->getMockForAbstractClass(AdapterInterface::class); + $this->loggerMock = $this->getMockForAbstractClass(LoggerInterface::class); $this->modelMock = $this->createMock(AbstractModel::class); - $this->retrier = new DeadlockRetrier(); + $this->retrier = new DeadlockRetrier($this->loggerMock); } /** @@ -75,6 +82,8 @@ public function testRetry(): void $this->modelMock->expects($this->exactly(DeadlockRetrierInterface::MAX_RETRIES)) ->method('getId') ->willThrowException(new DeadlockException()); + $this->loggerMock->expects($this->exactly(DeadlockRetrierInterface::MAX_RETRIES - 1)) + ->method('warning'); $this->retrier->execute( function () { @@ -95,6 +104,8 @@ public function testRetrySecond(): void $this->modelMock->expects($this->at(1)) ->method('getId') ->willReturn(2); + $this->loggerMock->expects($this->once()) + ->method('warning'); $this->retrier->execute( function () { diff --git a/app/code/Magento/Cron/Test/Unit/Observer/ProcessCronQueueObserverTest.php b/app/code/Magento/Cron/Test/Unit/Observer/ProcessCronQueueObserverTest.php index a48a3dd76b884..50e1d828d2720 100644 --- a/app/code/Magento/Cron/Test/Unit/Observer/ProcessCronQueueObserverTest.php +++ b/app/code/Magento/Cron/Test/Unit/Observer/ProcessCronQueueObserverTest.php @@ -773,15 +773,10 @@ function ($callback) { ->setMethods(['execute'])->getMock(); $testCronJob->expects($this->atLeastOnce())->method('execute')->with($schedule); - $this->objectManagerMock->expects( - $this->once() - )->method( - 'create' - )->with( - 'CronJob' - )->willReturn( - $testCronJob - ); + $this->objectManagerMock->expects($this->once()) + ->method('create') + ->with('CronJob') + ->willReturn($testCronJob); $this->cronQueueObserver->execute($this->observerMock); } @@ -1049,9 +1044,36 @@ public function testMissedJobsCleanedInTime() $this->scheduleCollectionMock->expects($this->any())->method('load')->willReturnSelf(); $scheduleMock->expects($this->any())->method('getCollection')->willReturn($this->scheduleCollectionMock); - $scheduleMock->expects($this->exactly(9))->method('getResource')->willReturn($this->scheduleResourceMock); - $this->scheduleFactoryMock->expects($this->exactly(10))->method('create')->willReturn($scheduleMock); + $scheduleMock->expects($this->exactly(10))->method('getResource')->willReturn($this->scheduleResourceMock); + $this->scheduleFactoryMock->expects($this->exactly(11))->method('create')->willReturn($scheduleMock); + + $connectionMock = $this->prepareConnectionMock($tableName); + + $this->scheduleResourceMock->expects($this->exactly(6)) + ->method('getTable') + ->with($tableName) + ->willReturn($tableName); + $this->scheduleResourceMock->expects($this->exactly(15)) + ->method('getConnection') + ->willReturn($connectionMock); + + $this->retrierMock->expects($this->exactly(5)) + ->method('execute') + ->willReturnCallback( + function ($callback) { + return $callback(); + } + ); + + $this->cronQueueObserver->execute($this->observerMock); + } + /** + * @param string $tableName + * @return AdapterInterface|MockObject + */ + private function prepareConnectionMock(string $tableName) + { $connectionMock = $this->getMockForAbstractClass(AdapterInterface::class); $connectionMock->expects($this->exactly(5)) @@ -1080,22 +1102,30 @@ public function testMissedJobsCleanedInTime() ) ->willReturn(1); - $this->scheduleResourceMock->expects($this->exactly(5)) - ->method('getTable') - ->with($tableName) - ->willReturn($tableName); - $this->scheduleResourceMock->expects($this->exactly(14)) - ->method('getConnection') - ->willReturn($connectionMock); - - $this->retrierMock->expects($this->exactly(5)) - ->method('execute') - ->willReturnCallback( - function ($callback) { - return $callback(); - } + $connectionMock->expects($this->any()) + ->method('quoteInto') + ->withConsecutive( + ['status = ?', \Magento\Cron\Model\Schedule::STATUS_RUNNING], + ['job_code IN (?)', ['test_job1']], + ) + ->willReturnOnConsecutiveCalls( + "status = 'running'", + "job_code IN ('test_job1')" ); - $this->cronQueueObserver->execute($this->observerMock); + $connectionMock->expects($this->once()) + ->method('update') + ->with( + $tableName, + ['status' => 'error', 'messages' => 'Time out'], + [ + "status = 'running'", + "job_code IN ('test_job1')", + 'scheduled_at < UTC_TIMESTAMP() - INTERVAL 1 DAY' + ] + ) + ->willReturn(0); + + return $connectionMock; } } diff --git a/app/code/Magento/Cron/etc/db_schema.xml b/app/code/Magento/Cron/etc/db_schema.xml index 609b435f8b39c..72b1428756898 100644 --- a/app/code/Magento/Cron/etc/db_schema.xml +++ b/app/code/Magento/Cron/etc/db_schema.xml @@ -21,16 +21,10 @@ - + - - - - - - - + diff --git a/app/code/Magento/Cron/etc/db_schema_whitelist.json b/app/code/Magento/Cron/etc/db_schema_whitelist.json index f0d6ebed8290f..2e5cc6e0a4618 100644 --- a/app/code/Magento/Cron/etc/db_schema_whitelist.json +++ b/app/code/Magento/Cron/etc/db_schema_whitelist.json @@ -13,7 +13,8 @@ "index": { "CRON_SCHEDULE_JOB_CODE": true, "CRON_SCHEDULE_SCHEDULED_AT_STATUS": true, - "CRON_SCHEDULE_SCHEDULE_ID_STATUS": true + "CRON_SCHEDULE_SCHEDULE_ID_STATUS": true, + "CRON_SCHEDULE_JOB_CODE_STATUS_SCHEDULED_AT": true }, "constraint": { "PRIMARY": true