Queue operations

  1. 7 modules/system/system.queue.inc queue
  2. 8 core/includes/common.inc queue

Queue items to allow later processing.

The queue system allows placing items in a queue and processing them later. The system tries to ensure that only one consumer can process an item.

Before a queue can be used it needs to be created by DrupalQueueInterface::createQueue().

Items can be added to the queue by passing an arbitrary data object to DrupalQueueInterface::createItem().

To process an item, call DrupalQueueInterface::claimItem() and specify how long you want to have a lease for working on that item. When finished processing, the item needs to be deleted by calling DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be made available again by the DrupalQueueInterface implementation once the lease expires. Another consumer will then be able to receive it when calling DrupalQueueInterface::claimItem(). Due to this, the processing code should be aware that an item might be handed over for processing more than once.

The $item object used by the DrupalQueueInterface can contain arbitrary metadata depending on the implementation. Systems using the interface should only rely on the data property which will contain the information passed to DrupalQueueInterface::createItem(). The full queue item returned by DrupalQueueInterface::claimItem() needs to be passed to DrupalQueueInterface::deleteItem() once processing is completed.

There are two kinds of queue backends available: reliable, which preserves the order of messages and guarantees that every item will be executed at least once. The non-reliable kind only does a best effort to preserve order in messages and to execute them at least once but there is a small chance that some items get lost. For example, some distributed back-ends like Amazon SQS will be managing jobs for a large set of producers and consumers where a strict FIFO ordering will likely not be preserved. Another example would be an in-memory queue backend which might lose items if it crashes. However, such a backend would be able to deal with significantly more writes than a reliable queue and for many tasks this is more important. See aggregator_cron() for an example of how to effectively utilize a non-reliable queue. Another example is doing Twitter statistics -- the small possibility of losing a few items is insignificant next to power of the queue being able to keep up with writes. As described in the processing section, regardless of the queue being reliable or not, the processing code should be aware that an item might be handed over for processing more than once (because the processing code might time out before it finishes).

Classes

Namesort descending Location Description
DrupalQueue modules/system/system.queue.inc Factory class for interacting with queues.
MemoryQueue modules/system/system.queue.inc Static queue implementation.
SystemQueue modules/system/system.queue.inc Default queue implementation.

Interfaces

Namesort descending Location Description
DrupalQueueInterface modules/system/system.queue.inc
DrupalReliableQueueInterface modules/system/system.queue.inc Reliable queue interface.

File

modules/system/system.queue.inc, line 8
Queue functionality.

Comments

You first call DrupalQueue::get() to register the queue, like so:

<?php
$queue
= DrupalQueue::get('example_queue');
$queue->createQueue();
?>

Then you can create items etc.

Example of to create items:

<?php
  $queue
= DrupalQueue::get('aggregator_feeds');
  foreach (
$result as $feed) {
    if (
$queue->createItem($feed)) {
     
// Add timestamp to avoid queueing item more than once.
     
db_update('aggregator_feed')
        ->
fields(array('queued' => REQUEST_TIME))
        ->
condition('fid', $feed->fid)
        ->
execute();
    }
  }
?>

Here's Views Queue, a module that helps administrators to expose the SystemQueue in Views and perform VBO operations against the queue.

Link: http://drupal.org/project/views_queue

To get a sense of how queues work, try out http://d7.drupalexamples.info/queue_example/insert_remove .

For queues run during cron, check out http://api.drupal.org/api/drupal/modules--system--system.api.php/functio...
And this example on Level Ten's blog.

Until I read the L10 blog post with a simple example implementation of a queue, I could not figure out how to make queues work with cron. The blog post is a must read.

Hashbangcode.com has a very well-written and helpful blog post on using the Drupal Queues API:

http://www.hashbangcode.com/blog/drupal-7-queues-api-579.html

Hi,

For instant, I have and perform some Queues processes in my_module. How I can show the report after all Queues in my module perform?

What your advice?

Alexey

i don't know what is the problem, the worker function is working if i don't use the queue..

<?php
function MODULENAME_cron() {
$nodes=expired_nodes('type'); //a function that fetch the nodes id array i want
 
$queue = DrupalQueue::get('update_node');
  foreach (
$nodes as $row) {
   
$queue->createItem($row);
  }
 
drupal_flush_all_caches();
}

function

MODULENAME_cron_queue_info() {
 
$queues['update_node'] = array(
   
'worker callback' => 'MODULENAME_callback',
   
'time' => 30, // time in second for each worker
 
);
  return
$queues;
}

function

MODULENAME_callback($data){
      foreach (
$data as $row) {
db_insert('field_data_field_SOMENAME')
  ->
fields(array(
   
'entity_type' => 'node',
   
'bundle' => 'event',
   
'entity_id'=>$row,
   
'revision_id'=>$row,
   
'language'=>'und',
   
'delta'=>0,
   
'field_other_tid'=>196,
  ))
  ->
execute();
db_insert('field_revision_field_SOMENAME')
  ->
fields(array(
   
'entity_type' => 'node',
   
'bundle' => 'event',
   
'entity_id'=>$row,
   
'revision_id'=>$row,
   
'language'=>'und',
   
'delta'=>0,
   
'field_other_tid'=>196,
  ))
    ->
execute();
  }
}
?>

MODULENAME_callback($data) should be written MODULENAME_callback($item), where $item is the $row use used to create a queue item with: $queue->createItem($row)

Your callback will process one item at a time, that item can contain any data.

What I am seeing is that sometimes a claimed object disappears, that is the retrieved object pointer is valid upon first use, but then is null. Strange.

To use MongoDB for a queue and not change the default, like this would:

# Message Queue
# $conf['queue_default_class'] = 'MongoDBQueue';

This works:

Assuming a queue named "my_test_queue"
either in the script, first do this:

global $conf;
$conf['queue_class_my_test_queue'] = 'MongoDBQueue';

or in settings.php

$conf['queue_class_my_test_queue'] = 'MongoDBQueue';

then

$queue = DrupalQueue::get('my_test_queue'); // will use your MongoDB

obviously, MongoDB needs to be configured and MongoDB queue module enabled.