CronSuspendQueueDelayTest.php
Same filename in other branches
Namespace
Drupal\Tests\Core\CronFile
-
core/
tests/ Drupal/ Tests/ Core/ Cron/ CronSuspendQueueDelayTest.php
View source
<?php
declare (strict_types=1);
namespace Drupal\Tests\Core\Cron;
use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Config\ImmutableConfig;
use Drupal\Core\Cron;
use Drupal\Core\DependencyInjection\ContainerBuilder;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Core\Lock\LockBackendInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\Queue\QueueWorkerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\Session\AccountSwitcherInterface;
use Drupal\Core\State\StateInterface;
use Drupal\Tests\UnitTestCase;
use Psr\Log\LoggerInterface;
/**
* Test Cron handling of suspended queues with a delay.
*
* @group Cron
* @covers \Drupal\Core\Queue\SuspendQueueException
* @coversDefaultClass \Drupal\Core\Cron
*/
final class CronSuspendQueueDelayTest extends UnitTestCase {
/**
* Constructor arguments for \Drupal\Core\Cron.
*
* @var object[]|\PHPUnit\Framework\MockObject\MockObject[]
*/
protected $cronConstructorArguments;
/**
* A worker for testing.
*
* @var \Drupal\Core\Queue\QueueWorkerInterface|\PHPUnit\Framework\MockObject\MockObject
*/
protected $workerA;
/**
* A worker for testing.
*
* @var \Drupal\Core\Queue\QueueWorkerInterface|\PHPUnit\Framework\MockObject\MockObject
*/
protected $workerB;
/**
* {@inheritdoc}
*/
protected function setUp() : void {
parent::setUp();
$lock = $this->createMock(LockBackendInterface::class);
$lock->expects($this->any())
->method('acquire')
->willReturn(TRUE);
$this->cronConstructorArguments = [
'module_handler' => $this->createMock(ModuleHandlerInterface::class),
'lock' => $lock,
'queue_factory' => $this->createMock(QueueFactory::class),
'state' => $this->createMock(StateInterface::class),
'account_switcher' => $this->createMock(AccountSwitcherInterface::class),
'logger' => $this->createMock(LoggerInterface::class),
'queue_manager' => $this->createMock(QueueWorkerManagerInterface::class),
'time' => $this->createMock(TimeInterface::class),
'queue_config' => [],
];
// Capture error logs.
$config = $this->createMock(ImmutableConfig::class);
$config->expects($this->any())
->method('get')
->with('logging')
->willReturn(0);
$configFactory = $this->createMock(ConfigFactoryInterface::class);
$configFactory->expects($this->any())
->method('get')
->with('system.cron')
->willReturn($config);
$container = new ContainerBuilder();
$container->set('config.factory', $configFactory);
\Drupal::setContainer($container);
$this->workerA = $this->createMock(QueueWorkerInterface::class);
$this->workerA
->expects($this->any())
->method('getPluginDefinition')
->willReturn([
'cron' => [
'time' => 300,
],
]);
$this->workerB = $this->createMock(QueueWorkerInterface::class);
$this->workerB
->expects($this->any())
->method('getPluginDefinition')
->willReturn([
'cron' => [
'time' => 300,
],
]);
}
/**
* Tests a queue is reprocessed again after other queues.
*
* Two queues are created:
* - test_worker_a.
* - test_worker_b.
*
* Queues and items are processed:
* - test_worker_a:
* - item throws SuspendQueueException with 2.0 delay.
* - test_worker_b:
* - item executes normally.
* - test_worker_a:
* - item throws SuspendQueueException with 3.0 delay.
* - test_worker_a:
* - no items remaining, quits.
*/
public function testSuspendQueue() : void {
[
'queue_factory' => $queueFactory,
'queue_manager' => $queueManager,
'time' => $time,
] = $this->cronConstructorArguments;
$cron = $this->getMockBuilder(Cron::class)
->onlyMethods([
'usleep',
])
->setConstructorArgs($this->cronConstructorArguments)
->getMock();
$delays = [
2000000,
3000000,
];
$cron->expects($this->exactly(count($delays)))
->method('usleep')
->with($this->callback(function (int $delay) use (&$delays) : bool {
return array_shift($delays) === $delay;
}));
$queueManager->expects($this->once())
->method('getDefinitions')
->willReturn([
'test_worker_a' => [
'id' => 'test_worker_a',
'cron' => [
'time' => 300,
],
],
'test_worker_b' => [
'id' => 'test_worker_b',
'cron' => [
'time' => 300,
],
],
]);
$queueA = $this->createMock(QueueInterface::class);
$queueB = $this->createMock(QueueInterface::class);
$queueFactory->expects($this->exactly(2))
->method('get')
->willReturnMap([
[
'test_worker_a',
FALSE,
$queueA,
],
[
'test_worker_b',
FALSE,
$queueB,
],
]);
// Expect this queue to be processed twice.
$queueA->expects($this->exactly(3))
->method('claimItem')
->willReturnOnConsecutiveCalls((object) [
'data' => 'test_data_a1',
], (object) [
'data' => 'test_data_a2',
], FALSE);
// Expect this queue to be processed once.
$queueB->expects($this->exactly(2))
->method('claimItem')
->willReturnOnConsecutiveCalls((object) [
'data' => 'test_data_b1',
], FALSE);
$queueManager->expects($this->any())
->method('createInstance')
->willReturnMap([
[
'test_worker_a',
[],
$this->workerA,
],
[
'test_worker_b',
[],
$this->workerB,
],
]);
$this->workerA
->expects($this->exactly(2))
->method('processItem')
->with($this->anything())
->willReturnOnConsecutiveCalls($this->throwException(new SuspendQueueException('', 0, NULL, 2.0)), $this->throwException(new SuspendQueueException('', 0, NULL, 3.0)));
$this->workerB
->expects($this->once())
->method('processItem')
->with('test_data_b1');
$time->expects($this->any())
->method('getCurrentTime')
->willReturn(60);
$cron->run();
}
/**
* Tests queues may be re-processed by whether delay exceeds threshold.
*
* Cron will pause and reprocess a queue after a delay if a worker throws
* a SuspendQueueException with a delay time not exceeding the maximum wait
* config.
*
* @param float $threshold
* The configured threshold.
* @param float $suspendQueueDelay
* An interval in seconds a worker will suspend the queue.
* @param bool $expectQueueDelay
* Whether to expect cron to sleep and re-process the queue.
*
* @dataProvider providerSuspendQueueThreshold
*/
public function testSuspendQueueThreshold(float $threshold, float $suspendQueueDelay, bool $expectQueueDelay) : void {
$this->cronConstructorArguments['queue_config'] = [
'suspendMaximumWait' => $threshold,
];
[
'queue_factory' => $queueFactory,
'queue_manager' => $queueManager,
] = $this->cronConstructorArguments;
$cron = $this->getMockBuilder(Cron::class)
->onlyMethods([
'usleep',
])
->setConstructorArgs($this->cronConstructorArguments)
->getMock();
$cron->expects($expectQueueDelay ? $this->once() : $this->never())
->method('usleep');
$queueManager->expects($this->once())
->method('getDefinitions')
->willReturn([
'test_worker' => [
'id' => 'test_worker',
'cron' => 300,
],
]);
$queue = $this->createMock(QueueInterface::class);
$queueFactory->expects($this->once())
->method('get')
->willReturnMap([
[
'test_worker',
FALSE,
$queue,
],
]);
$queue->expects($this->exactly($expectQueueDelay ? 2 : 1))
->method('claimItem')
->willReturnOnConsecutiveCalls((object) [
'data' => 'test_data',
], FALSE);
$queueManager->expects($this->exactly(1))
->method('createInstance')
->with('test_worker')
->willReturn($this->workerA);
$this->workerA
->expects($this->once())
->method('processItem')
->with($this->anything())
->willReturnOnConsecutiveCalls($this->throwException(new SuspendQueueException('', 0, NULL, $suspendQueueDelay)));
$cron->run();
}
/**
* Data for testing.
*
* @return array
* Scenarios for testing.
*/
public static function providerSuspendQueueThreshold() : array {
$scenarios = [];
$scenarios['cron will wait for the queue, and rerun'] = [
15.0,
10.0,
TRUE,
];
$scenarios['cron will not wait for the queue, and exit'] = [
15.0,
20.0,
FALSE,
];
return $scenarios;
}
/**
* Tests queues are executed in order.
*
* If multiple queues are delayed, they must execute in order of time.
*/
public function testSuspendQueueOrder() : void {
[
'queue_factory' => $queueFactory,
'queue_manager' => $queueManager,
'time' => $time,
] = $this->cronConstructorArguments;
$cron = $this->getMockBuilder(Cron::class)
->onlyMethods([
'usleep',
])
->setConstructorArgs($this->cronConstructorArguments)
->getMock();
$cron->expects($this->any())
->method('usleep');
$queueManager->expects($this->once())
->method('getDefinitions')
->willReturn([
'test_worker_a' => [
'id' => 'test_worker_a',
'cron' => [
'time' => 300,
],
],
'test_worker_b' => [
'id' => 'test_worker_b',
'cron' => [
'time' => 300,
],
],
'test_worker_c' => [
'id' => 'test_worker_c',
'cron' => [
'time' => 300,
],
],
'test_worker_d' => [
'id' => 'test_worker_d',
'cron' => [
'time' => 300,
],
],
]);
$queueA = $this->createMock(QueueInterface::class);
$queueB = $this->createMock(QueueInterface::class);
$queueC = $this->createMock(QueueInterface::class);
$queueD = $this->createMock(QueueInterface::class);
$queueFactory->expects($this->exactly(4))
->method('get')
->willReturnMap([
[
'test_worker_a',
FALSE,
$queueA,
],
[
'test_worker_b',
FALSE,
$queueB,
],
[
'test_worker_c',
FALSE,
$queueC,
],
[
'test_worker_d',
FALSE,
$queueD,
],
]);
$queueA->expects($this->any())
->method('claimItem')
->willReturnOnConsecutiveCalls((object) [
'data' => 'test_data_from_queue_a',
], FALSE);
$queueB->expects($this->any())
->method('claimItem')
->willReturnOnConsecutiveCalls((object) [
'data' => 'test_data_from_queue_b',
], (object) [
'data' => 'test_data_from_queue_b',
], FALSE);
$queueC->expects($this->any())
->method('claimItem')
->willReturnOnConsecutiveCalls((object) [
'data' => 'test_data_from_queue_c',
], (object) [
'data' => 'test_data_from_queue_c',
], FALSE);
$queueD->expects($this->any())
->method('claimItem')
->willReturnOnConsecutiveCalls((object) [
'data' => 'test_data_from_queue_d',
], FALSE);
// Recycle the same worker for all queues to test order sanely:
$queueManager->expects($this->any())
->method('createInstance')
->willReturnMap([
[
'test_worker_a',
[],
$this->workerA,
],
[
'test_worker_b',
[],
$this->workerA,
],
[
'test_worker_c',
[],
$this->workerA,
],
[
'test_worker_d',
[],
$this->workerA,
],
]);
$queues = [
// All queues are executed in sequence of definition:
'test_data_from_queue_a',
'test_data_from_queue_b',
'test_data_from_queue_c',
'test_data_from_queue_d',
// Queue C is executed again, and before queue B.
'test_data_from_queue_c',
// Queue B is executed again, after queue C since its delay was longer.
'test_data_from_queue_b',
];
$this->workerA
->expects($this->exactly(count($queues)))
->method('processItem')
->with($this->callback(function ($queue) use (&$queues) : bool {
return array_shift($queues) === $queue;
}))
->willReturnOnConsecutiveCalls(NULL, $this->throwException(new SuspendQueueException('', 0, NULL, 16.0)), $this->throwException(new SuspendQueueException('', 0, NULL, 8.0)), NULL, NULL, NULL);
$currentTime = 60;
$time->expects($this->any())
->method('getCurrentTime')
->willReturnCallback(function () use (&$currentTime) : int {
return (int) $currentTime;
});
$time->expects($this->any())
->method('getCurrentMicroTime')
->willReturnCallback(function () use (&$currentTime) : float {
return (double) $currentTime;
});
$delays = [
// Expect to wait for 8 seconds, then accelerate time by 4 seconds.
4,
8000000,
// SuspendQueueException requests to delay by 16 seconds, but 4 seconds
// have passed above, so there are just 12 seconds remaining:
0,
12000000,
];
$cron->expects($this->exactly(count($delays) / 2))
->method('usleep')
->with($this->callback(function (int $delay) use (&$currentTime, &$delays) : bool {
$currentTime += array_shift($delays);
return array_shift($delays) === $delay;
}));
$cron->run();
}
}
Classes
Title | Deprecated | Summary |
---|---|---|
CronSuspendQueueDelayTest | Test Cron handling of suspended queues with a delay. |
Buggy or inaccurate documentation? Please file an issue. Need support? Need help programming? Connect with the Drupal community.