system.queue.inc

You are here

Queue functionality.

Classes

Namesort descending Description
DrupalQueue Factory class for interacting with queues.
MemoryQueue Static queue implementation.
SystemQueue Default queue implementation.

Interfaces

Namesort descending Description
DrupalQueueInterface
DrupalReliableQueueInterface Reliable queue interface.

File

modules/system/system.queue.inc
View source
  1. <?php
  2. /**
  3. * @file
  4. * Queue functionality.
  5. */
  6. /**
  7. * @defgroup queue Queue operations
  8. * @{
  9. * Queue items to allow later processing.
  10. *
  11. * The queue system allows placing items in a queue and processing them later.
  12. * The system tries to ensure that only one consumer can process an item.
  13. *
  14. * Before a queue can be used it needs to be created by
  15. * DrupalQueueInterface::createQueue().
  16. *
  17. * Items can be added to the queue by passing an arbitrary data object to
  18. * DrupalQueueInterface::createItem().
  19. *
  20. * To process an item, call DrupalQueueInterface::claimItem() and specify how
  21. * long you want to have a lease for working on that item. When finished
  22. * processing, the item needs to be deleted by calling
  23. * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
  24. * made available again by the DrupalQueueInterface implementation once the
  25. * lease expires. Another consumer will then be able to receive it when calling
  26. * DrupalQueueInterface::claimItem(). Due to this, the processing code should
  27. * be aware that an item might be handed over for processing more than once.
  28. *
  29. * The $item object used by the DrupalQueueInterface can contain arbitrary
  30. * metadata depending on the implementation. Systems using the interface should
  31. * only rely on the data property which will contain the information passed to
  32. * DrupalQueueInterface::createItem(). The full queue item returned by
  33. * DrupalQueueInterface::claimItem() needs to be passed to
  34. * DrupalQueueInterface::deleteItem() once processing is completed.
  35. *
  36. * There are two kinds of queue backends available: reliable, which preserves
  37. * the order of messages and guarantees that every item will be executed at
  38. * least once. The non-reliable kind only does a best effort to preserve order
  39. * in messages and to execute them at least once but there is a small chance
  40. * that some items get lost. For example, some distributed back-ends like
  41. * Amazon SQS will be managing jobs for a large set of producers and consumers
  42. * where a strict FIFO ordering will likely not be preserved. Another example
  43. * would be an in-memory queue backend which might lose items if it crashes.
  44. * However, such a backend would be able to deal with significantly more writes
  45. * than a reliable queue and for many tasks this is more important. See
  46. * aggregator_cron() for an example of how to effectively utilize a
  47. * non-reliable queue. Another example is doing Twitter statistics -- the small
  48. * possibility of losing a few items is insignificant next to power of the
  49. * queue being able to keep up with writes. As described in the processing
  50. * section, regardless of the queue being reliable or not, the processing code
  51. * should be aware that an item might be handed over for processing more than
  52. * once (because the processing code might time out before it finishes).
  53. */
  54. /**
  55. * Factory class for interacting with queues.
  56. */
  57. class DrupalQueue {
  58. /**
  59. * Returns the queue object for a given name.
  60. *
  61. * The following variables can be set by variable_set or $conf overrides:
  62. * - queue_class_$name: the class to be used for the queue $name.
  63. * - queue_default_class: the class to use when queue_class_$name is not
  64. * defined. Defaults to SystemQueue, a reliable backend using SQL.
  65. * - queue_default_reliable_class: the class to use when queue_class_$name is
  66. * not defined and the queue_default_class is not reliable. Defaults to
  67. * SystemQueue.
  68. *
  69. * @param $name
  70. * Arbitrary string. The name of the queue to work with.
  71. * @param $reliable
  72. * TRUE if the ordering of items and guaranteeing every item executes at
  73. * least once is important, FALSE if scalability is the main concern.
  74. *
  75. * @return
  76. * The queue object for a given name.
  77. */
  78. public static function get($name, $reliable = FALSE) {
  79. static $queues;
  80. if (!isset($queues[$name])) {
  81. $class = variable_get('queue_class_' . $name, NULL);
  82. if (!$class) {
  83. $class = variable_get('queue_default_class', 'SystemQueue');
  84. }
  85. $object = new $class($name);
  86. if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
  87. $class = variable_get('queue_default_reliable_class', 'SystemQueue');
  88. $object = new $class($name);
  89. }
  90. $queues[$name] = $object;
  91. }
  92. return $queues[$name];
  93. }
  94. }
  95. interface DrupalQueueInterface {
  96. /**
  97. * Add a queue item and store it directly to the queue.
  98. *
  99. * @param $data
  100. * Arbitrary data to be associated with the new task in the queue.
  101. * @return
  102. * TRUE if the item was successfully created and was (best effort) added
  103. * to the queue, otherwise FALSE. We don't guarantee the item was
  104. * committed to disk etc, but as far as we know, the item is now in the
  105. * queue.
  106. */
  107. public function createItem($data);
  108. /**
  109. * Retrieve the number of items in the queue.
  110. *
  111. * This is intended to provide a "best guess" count of the number of items in
  112. * the queue. Depending on the implementation and the setup, the accuracy of
  113. * the results of this function may vary.
  114. *
  115. * e.g. On a busy system with a large number of consumers and items, the
  116. * result might only be valid for a fraction of a second and not provide an
  117. * accurate representation.
  118. *
  119. * @return
  120. * An integer estimate of the number of items in the queue.
  121. */
  122. public function numberOfItems();
  123. /**
  124. * Claim an item in the queue for processing.
  125. *
  126. * @param $lease_time
  127. * How long the processing is expected to take in seconds, defaults to an
  128. * hour. After this lease expires, the item will be reset and another
  129. * consumer can claim the item. For idempotent tasks (which can be run
  130. * multiple times without side effects), shorter lease times would result
  131. * in lower latency in case a consumer fails. For tasks that should not be
  132. * run more than once (non-idempotent), a larger lease time will make it
  133. * more rare for a given task to run multiple times in cases of failure,
  134. * at the cost of higher latency.
  135. * @return
  136. * On success we return an item object. If the queue is unable to claim an
  137. * item it returns false. This implies a best effort to retrieve an item
  138. * and either the queue is empty or there is some other non-recoverable
  139. * problem.
  140. */
  141. public function claimItem($lease_time = 3600);
  142. /**
  143. * Delete a finished item from the queue.
  144. *
  145. * @param $item
  146. * The item returned by DrupalQueueInterface::claimItem().
  147. */
  148. public function deleteItem($item);
  149. /**
  150. * Release an item that the worker could not process, so another
  151. * worker can come in and process it before the timeout expires.
  152. *
  153. * @param $item
  154. * @return boolean
  155. */
  156. public function releaseItem($item);
  157. /**
  158. * Create a queue.
  159. *
  160. * Called during installation and should be used to perform any necessary
  161. * initialization operations. This should not be confused with the
  162. * constructor for these objects, which is called every time an object is
  163. * instantiated to operate on a queue. This operation is only needed the
  164. * first time a given queue is going to be initialized (for example, to make
  165. * a new database table or directory to hold tasks for the queue -- it
  166. * depends on the queue implementation if this is necessary at all).
  167. */
  168. public function createQueue();
  169. /**
  170. * Delete a queue and every item in the queue.
  171. */
  172. public function deleteQueue();
  173. }
  174. /**
  175. * Reliable queue interface.
  176. *
  177. * Classes implementing this interface preserve the order of messages and
  178. * guarantee that every item will be executed at least once.
  179. */
  180. interface DrupalReliableQueueInterface extends DrupalQueueInterface {
  181. }
  182. /**
  183. * Default queue implementation.
  184. */
  185. class SystemQueue implements DrupalReliableQueueInterface {
  186. /**
  187. * The name of the queue this instance is working with.
  188. *
  189. * @var string
  190. */
  191. protected $name;
  192. public function __construct($name) {
  193. $this->name = $name;
  194. }
  195. public function createItem($data) {
  196. // During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain
  197. // the queue table yet, so we cannot rely on drupal_write_record().
  198. $query = db_insert('queue')
  199. ->fields(array(
  200. 'name' => $this->name,
  201. 'data' => serialize($data),
  202. // We cannot rely on REQUEST_TIME because many items might be created
  203. // by a single request which takes longer than 1 second.
  204. 'created' => time(),
  205. ));
  206. return (bool) $query->execute();
  207. }
  208. public function numberOfItems() {
  209. return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
  210. }
  211. public function claimItem($lease_time = 30) {
  212. // Claim an item by updating its expire fields. If claim is not successful
  213. // another thread may have claimed the item in the meantime. Therefore loop
  214. // until an item is successfully claimed or we are reasonably sure there
  215. // are no unclaimed items left.
  216. while (TRUE) {
  217. $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
  218. if ($item) {
  219. // Try to update the item. Only one thread can succeed in UPDATEing the
  220. // same row. We cannot rely on REQUEST_TIME because items might be
  221. // claimed by a single consumer which runs longer than 1 second. If we
  222. // continue to use REQUEST_TIME instead of the current time(), we steal
  223. // time from the lease, and will tend to reset items before the lease
  224. // should really expire.
  225. $update = db_update('queue')
  226. ->fields(array(
  227. 'expire' => time() + $lease_time,
  228. ))
  229. ->condition('item_id', $item->item_id)
  230. ->condition('expire', 0);
  231. // If there are affected rows, this update succeeded.
  232. if ($update->execute()) {
  233. $item->data = unserialize($item->data);
  234. return $item;
  235. }
  236. }
  237. else {
  238. // No items currently available to claim.
  239. return FALSE;
  240. }
  241. }
  242. }
  243. public function releaseItem($item) {
  244. $update = db_update('queue')
  245. ->fields(array(
  246. 'expire' => 0,
  247. ))
  248. ->condition('item_id', $item->item_id);
  249. return $update->execute();
  250. }
  251. public function deleteItem($item) {
  252. db_delete('queue')
  253. ->condition('item_id', $item->item_id)
  254. ->execute();
  255. }
  256. public function createQueue() {
  257. // All tasks are stored in a single database table (which is created when
  258. // Drupal is first installed) so there is nothing we need to do to create
  259. // a new queue.
  260. }
  261. public function deleteQueue() {
  262. db_delete('queue')
  263. ->condition('name', $this->name)
  264. ->execute();
  265. }
  266. }
  267. /**
  268. * Static queue implementation.
  269. *
  270. * This allows "undelayed" variants of processes relying on the Queue
  271. * interface. The queue data resides in memory. It should only be used for
  272. * items that will be queued and dequeued within a given page request.
  273. */
  274. class MemoryQueue implements DrupalQueueInterface {
  275. /**
  276. * The queue data.
  277. *
  278. * @var array
  279. */
  280. protected $queue;
  281. /**
  282. * Counter for item ids.
  283. *
  284. * @var int
  285. */
  286. protected $id_sequence;
  287. /**
  288. * Start working with a queue.
  289. *
  290. * @param $name
  291. * Arbitrary string. The name of the queue to work with.
  292. */
  293. public function __construct($name) {
  294. $this->queue = array();
  295. $this->id_sequence = 0;
  296. }
  297. public function createItem($data) {
  298. $item = new stdClass();
  299. $item->item_id = $this->id_sequence++;
  300. $item->data = $data;
  301. $item->created = time();
  302. $item->expire = 0;
  303. $this->queue[$item->item_id] = $item;
  304. }
  305. public function numberOfItems() {
  306. return count($this->queue);
  307. }
  308. public function claimItem($lease_time = 30) {
  309. foreach ($this->queue as $key => $item) {
  310. if ($item->expire == 0) {
  311. $item->expire = time() + $lease_time;
  312. $this->queue[$key] = $item;
  313. return $item;
  314. }
  315. }
  316. return FALSE;
  317. }
  318. public function deleteItem($item) {
  319. unset($this->queue[$item->item_id]);
  320. }
  321. public function releaseItem($item) {
  322. if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
  323. $this->queue[$item->item_id]->expire = 0;
  324. return TRUE;
  325. }
  326. return FALSE;
  327. }
  328. public function createQueue() {
  329. // Nothing needed here.
  330. }
  331. public function deleteQueue() {
  332. $this->queue = array();
  333. $this->id_sequence = 0;
  334. }
  335. }
  336. /**
  337. * @} End of "defgroup queue".
  338. */