1. 8.2.x core/core.api.php queue
  2. 8.0.x core/core.api.php queue
  3. 8.1.x core/core.api.php queue
  4. 8.3.x core/core.api.php queue
  5. 7.x modules/system/system.queue.inc queue

Queue items to allow later processing.

The queue system allows placing items in a queue and processing them later. The system tries to ensure that only one consumer can process an item.

Before a queue can be used it needs to be created by DrupalQueueInterface::createQueue().

Items can be added to the queue by passing an arbitrary data object to DrupalQueueInterface::createItem().

To process an item, call DrupalQueueInterface::claimItem() and specify how long you want to have a lease for working on that item. When finished processing, the item needs to be deleted by calling DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be made available again by the DrupalQueueInterface implementation once the lease expires. Another consumer will then be able to receive it when calling DrupalQueueInterface::claimItem(). Due to this, the processing code should be aware that an item might be handed over for processing more than once.

The $item object used by the DrupalQueueInterface can contain arbitrary metadata depending on the implementation. Systems using the interface should only rely on the data property which will contain the information passed to DrupalQueueInterface::createItem(). The full queue item returned by DrupalQueueInterface::claimItem() needs to be passed to DrupalQueueInterface::deleteItem() once processing is completed.

There are two kinds of queue backends available: reliable, which preserves the order of messages and guarantees that every item will be executed at least once. The non-reliable kind only does a best effort to preserve order in messages and to execute them at least once but there is a small chance that some items get lost. For example, some distributed back-ends like Amazon SQS will be managing jobs for a large set of producers and consumers where a strict FIFO ordering will likely not be preserved. Another example would be an in-memory queue backend which might lose items if it crashes. However, such a backend would be able to deal with significantly more writes than a reliable queue and for many tasks this is more important. See aggregator_cron() for an example of how to effectively utilize a non-reliable queue. Another example is doing Twitter statistics -- the small possibility of losing a few items is insignificant next to power of the queue being able to keep up with writes. As described in the processing section, regardless of the queue being reliable or not, the processing code should be aware that an item might be handed over for processing more than once (because the processing code might time out before it finishes).

File

modules/system/system.queue.inc, line 8
Queue functionality.

Classes

Namesort descending Location Description
DrupalQueue modules/system/system.queue.inc Factory class for interacting with queues.
MemoryQueue modules/system/system.queue.inc Static queue implementation.
SystemQueue modules/system/system.queue.inc Default queue implementation.

Interfaces

Namesort descending Location Description
DrupalQueueInterface modules/system/system.queue.inc
DrupalReliableQueueInterface modules/system/system.queue.inc Reliable queue interface.

Comments

coltrane’s picture

You first call DrupalQueue::get() to register the queue, like so:

$queue = DrupalQueue::get('example_queue');
$queue->createQueue();

Then you can create items etc.

Shiny’s picture

Example of to create items:

  $queue = DrupalQueue::get('aggregator_feeds');
  foreach ($result as $feed) {
    if ($queue->createItem($feed)) {
      // Add timestamp to avoid queueing item more than once.
      db_update('aggregator_feed')
        ->fields(array('queued' => REQUEST_TIME))
        ->condition('fid', $feed->fid)
        ->execute();
    }
  }
claudiu.cristea’s picture

Here's Views Queue, a module that helps administrators to expose the SystemQueue in Views and perform VBO operations against the queue.

Link: http://drupal.org/project/views_queue

gcassie’s picture

To get a sense of how queues work, try out http://d7.drupalexamples.info/queue_example/insert_remove .

sethviebrock’s picture

For queues run during cron, check out http://api.drupal.org/api/drupal/modules--system--system.api.php/functio...
And this example on Level Ten's blog.

jacobson’s picture

Until I read the L10 blog post with a simple example implementation of a queue, I could not figure out how to make queues work with cron. The blog post is a must read.

fuerst’s picture

jacobson’s picture

Hashbangcode.com has a very well-written and helpful blog post on using the Drupal Queues API:

http://www.hashbangcode.com/blog/drupal-7-queues-api-579.html

dmstru’s picture

Hi,

For instant, I have and perform some Queues processes in my_module. How I can show the report after all Queues in my module perform?

What your advice?

Alexey

tsotsos’s picture

i don't know what is the problem, the worker function is working if i don't use the queue..

<?php
function MODULENAME_cron() {
$nodes=expired_nodes('type'); //a function that fetch the nodes id array i want
  $queue = DrupalQueue::get('update_node');
  foreach ($nodes as $row) {
    $queue->createItem($row);
  }
  drupal_flush_all_caches();
}

function MODULENAME_cron_queue_info() {
  $queues['update_node'] = array(
    'worker callback' => 'MODULENAME_callback',
    'time' => 30, // time in second for each worker
  );
  return $queues;
}

function MODULENAME_callback($data){
      foreach ($data as $row) {
db_insert('field_data_field_SOMENAME')
  ->fields(array(
    'entity_type' => 'node',
    'bundle' => 'event',
    'entity_id'=>$row,
    'revision_id'=>$row,
    'language'=>'und',
    'delta'=>0,
    'field_other_tid'=>196,
  ))
  ->execute();
db_insert('field_revision_field_SOMENAME')
  ->fields(array(
    'entity_type' => 'node',
    'bundle' => 'event',
    'entity_id'=>$row,
    'revision_id'=>$row,
    'language'=>'und',
    'delta'=>0,
    'field_other_tid'=>196,
  ))
    ->execute();
  }
}
?>
arosboro’s picture

MODULENAME_callback($data) should be written MODULENAME_callback($item), where $item is the $row use used to create a queue item with: $queue->createItem($row)

Your callback will process one item at a time, that item can contain any data.

hoporr’s picture

What I am seeing is that sometimes a claimed object disappears, that is the retrieved object pointer is valid upon first use, but then is null. Strange.

js’s picture

To use MongoDB for a queue and not change the default, like this would:

# Message Queue
# $conf['queue_default_class'] = 'MongoDBQueue';

This works:

Assuming a queue named "my_test_queue"
either in the script, first do this:

global $conf;
$conf['queue_class_my_test_queue'] = 'MongoDBQueue';

or in settings.php

$conf['queue_class_my_test_queue'] = 'MongoDBQueue';

then

$queue = DrupalQueue::get('my_test_queue'); // will use your MongoDB

obviously, MongoDB needs to be configured and MongoDB queue module enabled.

sphism’s picture

Not hugely on topic but is "queue" actually an american english word? If not then all of this breaks drupal's coding standards.

eric.goodwin’s picture

Yes, "queue" is in general use outside of lines in British English. It refers specifically to the this exact situation where a list of tasks are maintained in first-in-first-out order. If the order was last-in-first-out then it would be referred to as a "stack".

Adding an item to the list can be referred to as "enqueuing" and the reverse is "dequeuing". For a stack to terms are "push" and "pop" respectively.

D2ev’s picture

how to find items in a queue using queue name in code. I have used drush command 'drush queue-list' which shows list of queues with size. I want same in code.

D2ev’s picture

Found my answer -

$name = 'revision_scheduler';
$q = DrupalQueue::get($name);
dsm($q->numberOfItems());
artreaktor’s picture

If you have some problems with your queues Queue UI module can help.

jacobischwartz’s picture

What's a good way to deal with failing queues, where workers are crashing and burning?

It looks like a worker can throw an exception, which causes the item to be re-released continually into the queue after the lease has expired. But if you're getting a lot of failures (maybe an external service is failing), it seems like this might clog the queue pipeline (compounding the issue and impacting performance for other queue runs too). Is there a way to configure max retries and shunt items to a hospital queue?

Elijah Lynn’s picture

Decent info on the SystemQueue, MemoryQueue plus a touch on BatchQueue and BatchMemoryQueue. Seems to still be relevant especially touching on the fact that the ::createQueue() in SystemQueue and MemoryQueue is a no-op and only needs to be called if you are making your own queue.

update: http://www.ent.iastate.edu/it/Batch_and_Queue.pdf

Elijah Lynn’s picture

kunal.kursija’s picture

Thanks, Great write-up.