Queue items to allow later processing.
The queue system allows placing items in a queue and processing them later. The system tries to ensure that only one consumer can process an item.
Before a queue can be used it needs to be created by DrupalQueueInterface::createQueue().
Items can be added to the queue by passing an arbitrary data object to DrupalQueueInterface::createItem().
To process an item, call DrupalQueueInterface::claimItem() and specify how long you want to have a lease for working on that item. When finished processing, the item needs to be deleted by calling DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be made available again by the DrupalQueueInterface implementation once the lease expires. Another consumer will then be able to receive it when calling DrupalQueueInterface::claimItem(). Due to this, the processing code should be aware that an item might be handed over for processing more than once.
The $item object used by the DrupalQueueInterface can contain arbitrary metadata depending on the implementation. Systems using the interface should only rely on the data property which will contain the information passed to DrupalQueueInterface::createItem(). The full queue item returned by DrupalQueueInterface::claimItem() needs to be passed to DrupalQueueInterface::deleteItem() once processing is completed.
There are two kinds of queue backends available: reliable, which preserves the order of messages and guarantees that every item will be executed at least once. The non-reliable kind only does a best effort to preserve order in messages and to execute them at least once but there is a small chance that some items get lost. For example, some distributed back-ends like Amazon SQS will be managing jobs for a large set of producers and consumers where a strict FIFO ordering will likely not be preserved. Another example would be an in-memory queue backend which might lose items if it crashes. However, such a backend would be able to deal with significantly more writes than a reliable queue and for many tasks this is more important. See aggregator_cron() for an example of how to effectively utilize a non-reliable queue. Another example is doing Twitter statistics -- the small possibility of losing a few items is insignificant next to power of the queue being able to keep up with writes. As described in the processing section, regardless of the queue being reliable or not, the processing code should be aware that an item might be handed over for processing more than once (because the processing code might time out before it finishes).
Classes
|
Name |
Location | Description |
|---|---|---|
| DrupalQueue |
modules/ |
Factory class for interacting with queues. |
| MemoryQueue |
modules/ |
Static queue implementation. |
| SystemQueue |
modules/ |
Default queue implementation. |
Interfaces
|
Name |
Location | Description |
|---|---|---|
| DrupalQueueInterface |
modules/ |
|
| DrupalReliableQueueInterface |
modules/ |
Reliable queue interface. |
File
- modules/
system/ system.queue.inc, line 8 - Queue functionality.
Comments
Creating a queue
PermalinkYou first call DrupalQueue::get() to register the queue, like so:
<?php$queue = DrupalQueue::get('example_queue');
$queue->createQueue();
?>
Then you can create items etc.
Example of to create
PermalinkExample of to create items:
<?php$queue = DrupalQueue::get('aggregator_feeds');
foreach ($result as $feed) {
if ($queue->createItem($feed)) {
// Add timestamp to avoid queueing item more than once.
db_update('aggregator_feed')
->fields(array('queued' => REQUEST_TIME))
->condition('fid', $feed->fid)
->execute();
}
}
?>
Browse & interact with SystemQueue
PermalinkHere's Views Queue, a module that helps administrators to expose the SystemQueue in Views and perform VBO operations against the queue.
Link: http://drupal.org/project/views_queue
Play around with a queue
PermalinkTo get a sense of how queues work, try out http://d7.drupalexamples.info/queue_example/insert_remove .
Cron queues
PermalinkFor queues run during cron, check out http://api.drupal.org/api/drupal/modules--system--system.api.php/functio...
And this example on Level Ten's blog.
Level Ten Blog Is Very Helpful
PermalinkUntil I read the L10 blog post with a simple example implementation of a queue, I could not figure out how to make queues work with cron. The blog post is a must read.
Another Helpful Resource
PermalinkHashbangcode.com has a very well-written and helpful blog post on using the Drupal Queues API:
http://www.hashbangcode.com/blog/drupal-7-queues-api-579.html
How I can process result of perform Queues
PermalinkHi,
For instant, I have and perform some Queues processes in my_module. How I can show the report after all Queues in my module perform?
What your advice?
Alexey
I can't make it work i don't get it..
Permalinki don't know what is the problem, the worker function is working if i don't use the queue..
<?phpfunction MODULENAME_cron() {
$nodes=expired_nodes('type'); //a function that fetch the nodes id array i want
$queue = DrupalQueue::get('update_node');
foreach ($nodes as $row) {
$queue->createItem($row);
}
drupal_flush_all_caches();
}
function
MODULENAME_cron_queue_info() {$queues['update_node'] = array(
'worker callback' => 'MODULENAME_callback',
'time' => 30, // time in second for each worker
);
return $queues;
}
function
MODULENAME_callback($data){foreach ($data as $row) {
db_insert('field_data_field_SOMENAME')
->fields(array(
'entity_type' => 'node',
'bundle' => 'event',
'entity_id'=>$row,
'revision_id'=>$row,
'language'=>'und',
'delta'=>0,
'field_other_tid'=>196,
))
->execute();
db_insert('field_revision_field_SOMENAME')
->fields(array(
'entity_type' => 'node',
'bundle' => 'event',
'entity_id'=>$row,
'revision_id'=>$row,
'language'=>'und',
'delta'=>0,
'field_other_tid'=>196,
))
->execute();
}
}
?>
MODULENAME_callback($data)
PermalinkMODULENAME_callback($data)should be writtenMODULENAME_callback($item), where$itemis the$rowuse used to create a queue item with:$queue->createItem($row)Your callback will process one item at a time, that item can contain any data.
Claimed Objects disapper
PermalinkWhat I am seeing is that sometimes a claimed object disappears, that is the retrieved object pointer is valid upon first use, but then is null. Strange.
MongoDBQueue for a specific queue
PermalinkTo use MongoDB for a queue and not change the default, like this would:
# Message Queue
# $conf['queue_default_class'] = 'MongoDBQueue';
This works:
Assuming a queue named "my_test_queue"
either in the script, first do this:
global $conf;
$conf['queue_class_my_test_queue'] = 'MongoDBQueue';
or in settings.php
$conf['queue_class_my_test_queue'] = 'MongoDBQueue';
then
$queue = DrupalQueue::get('my_test_queue'); // will use your MongoDB
obviously, MongoDB needs to be configured and MongoDB queue module enabled.