Index: modules/system/system.info =================================================================== RCS file: /cvs/drupal/drupal/modules/system/system.info,v retrieving revision 1.11 diff -u -r1.11 system.info --- modules/system/system.info 12 Oct 2008 01:23:06 -0000 1.11 +++ modules/system/system.info 26 Apr 2009 00:54:33 -0000 @@ -6,6 +6,7 @@ core = 7.x files[] = system.module files[] = system.admin.inc +files[] = system.queue.inc files[] = image.gd.inc files[] = system.install required = TRUE Index: modules/system/system.module =================================================================== RCS file: /cvs/drupal/drupal/modules/system/system.module,v retrieving revision 1.683 diff -u -r1.683 system.module --- modules/system/system.module 25 Apr 2009 13:47:15 -0000 1.683 +++ modules/system/system.module 26 Apr 2009 00:54:35 -0000 @@ -1577,6 +1577,10 @@ foreach ($cache_tables as $table) { cache_clear_all(NULL, $table); } + + // Reset expired items in the default queue implementation table. If that's + // not used, no problems, the query will will be a no-op. + db_update('queue')->fields(array('process_id' => 0, 'expire' => 0))->condition('expire', time(), '<')->execute(); } /** Index: modules/system/system.test =================================================================== RCS file: /cvs/drupal/drupal/modules/system/system.test,v retrieving revision 1.40 diff -u -r1.40 system.test --- modules/system/system.test 31 Mar 2009 01:49:54 -0000 1.40 +++ modules/system/system.test 26 Apr 2009 00:54:36 -0000 @@ -878,7 +878,7 @@ 'node_admin_theme' => FALSE, ); $this->drupalPost('admin/build/themes', $edit, t('Save configuration')); - + $this->drupalGet('admin'); $this->assertRaw('themes/garland', t('Administration theme used on an administration page.')); @@ -887,7 +887,7 @@ // Reset to the default theme settings. $this->drupalPost('admin/build/themes', array(), t('Reset to defaults')); - + $this->drupalGet('admin'); $this->assertRaw('themes/garland', t('Site default theme used on administration page.')); @@ -895,3 +895,90 @@ $this->assertRaw('themes/garland', t('Site default theme used on the add content page.')); } } + + +/** + * Test the basic queue functionality. + */ +class QueueTestCase extends DrupalWebTestCase { + function getInfo() { + return array( + 'name' => t('Queue functionality'), + 'description' => t('Queues and dequeues a set of items to check the basic queue functionality.'), + 'group' => t('System'), + ); + } + + /** + * Queues and dequeues a set of items to check the basic queue functionality. + */ + function testQueue() { + // Create two queues. + module_invoke('system', 'queue_create_queue', $queue1 = $this->randomName()); + module_invoke('system', 'queue_create_queue', $queue2 = $this->randomName()); + + // Create four items. + $items = array(); + for ($i = 0; $i < 4; $i++) { + $items[] = array($this->randomName() => $this->randomName()); + } + + // Queue items 1 and 2 in the queue1. + module_invoke('system', 'queue_add_item', $queue1, $items[0]); + module_invoke('system', 'queue_add_item', $queue1, $items[1]); + + // Retrieve two items from queue1. + $entries = array(); + $new_items = array(); + + $entries[] = $entry = module_invoke('system', 'queue_reserve_item', $queue1); + $new_items[] = $entry->item; + + $entries[] = $entry = module_invoke('system', 'queue_reserve_item', $queue1); + $new_items[] = $entry->item; + + // First two dequeued items should match the first two items we queued. + $this->assertEqual($this->queueScore($items, $new_items), 2, t('Two items matched')); + + // Add two more items. + module_invoke('system', 'queue_add_item', $queue1, $items[2]); + module_invoke('system', 'queue_add_item', $queue1, $items[3]); + + $entries[] = $entry = module_invoke('system', 'queue_reserve_item', $queue1); + $new_items[] = $entry->item; + + $entries[] = $entry = module_invoke('system', 'queue_reserve_item', $queue1); + $new_items[] = $entry->item; + + // All dequeued items should match the items we queued exactly once, + // therefore the score must be exactly 4. + $this->assertEqual($this->queueScore($items, $new_items), 4, t('Four items matched')); + + // There should be no duplicate items. + $this->assertEqual($this->queueScore($new_items, $new_items), 4, t('Four items matched')); + + // Delete all items from queue1. + foreach ($entries as $entry) { + module_invoke('system', 'queue_delete_item', $entry); + } + + // Check that both queues are empty. + $this->assertFalse(module_invoke('system', 'queue_reserve_item', $queue1), t('Queue 1 is empty')); + $this->assertFalse(module_invoke('system', 'queue_reserve_item', $queue2), t('Queue 2 is empty')); + } + + /** + * This function returns the number of equal items in two arrays. + */ + function queueScore($items, $new_items) { + $score = 0; + foreach ($items as $item) { + foreach ($new_items as $new_item) { + if ($item === $new_item) { + $score++; + } + } + } + return $score; + } +} Index: modules/system/system.install =================================================================== RCS file: /cvs/drupal/drupal/modules/system/system.install,v retrieving revision 1.316 diff -u -r1.316 system.install --- modules/system/system.install 20 Apr 2009 02:23:16 -0000 1.316 +++ modules/system/system.install 26 Apr 2009 00:54:34 -0000 @@ -1041,6 +1041,67 @@ 'primary key' => array('mlid'), ); + $schema['queue'] = array( + 'description' => 'Stores items in queues.', + 'fields' => array( + 'item_id' => array( + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ), + 'queue_name' => array( + 'type' => 'varchar', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The queue name.', + ), + 'process_id' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The ID of the dequeuing process.', + ), + 'item' => array( + 'type' => 'text', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The item itself.', + ), + 'expire' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The time the item needs reset.', + ), + 'created' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'When the item needs reset.', + ), + ), + 'primary key' => array('item_id'), + 'indexes' => array( + 'process_queue' => array('process_id', 'queue_name', 'created'), + 'process_expire' => array('process_id', 'expire'), + ), + ); + + $schema['queue_process_id'] = array( + 'description' => 'Stores queue process IDs, used to auto-incrament the process ID so that a unique process ID is used.', + 'fields' => array( + 'process_id' => array( + 'type' => 'serial', + 'not null' => TRUE, + 'description' => 'Primary Key: Unique process ID used to make sure only one consumer gets one item.', + ), + ), + 'primary key' => array('process_id'), + ); + $schema['registry'] = array( 'description' => "Each record is a function, class, or interface name and the file it is in.", 'fields' => array( @@ -3244,6 +3305,76 @@ } /** + * Add the queue tables. + */ +function system_update_7022() { + $schema['queue'] = array( + 'description' => 'Stores items in queues.', + 'fields' => array( + 'item_id' => array( + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ), + 'queue_name' => array( + 'type' => 'varchar', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The queue name.', + ), + 'process_id' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The ID of the dequeuing process.', + ), + 'item' => array( + 'type' => 'text', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The item itself.', + ), + 'expire' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The time the item needs reset.', + ), + 'created' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'When the item needs reset.', + ), + ), + 'primary key' => array('item_id'), + 'indexes' => array( + 'process_queue' => array('process_id', 'queue_name', 'created'), + 'process_expire' => array('process_id', 'expire'), + ), + ); + + $schema['queue_process_id'] = array( + 'description' => 'Stores queue process IDs, used to auto-incrament the process ID so that a unique process ID is used.', + 'fields' => array( + 'process_id' => array( + 'type' => 'serial', + 'not null' => TRUE, + 'description' => 'Primary Key: Unique process ID used to make sure only one consumer gets one item.', + ), + ), + 'primary key' => array('process_id'), + ); + db_create_table($ret, 'queue', $schema['queue']); + db_create_table($ret, 'queue_process_id', $schema['queue_process_id']); + + return $ret; +} + +/** * @} End of "defgroup updates-6.x-to-7.x" * The next series of updates should start at 8000. */ Index: modules/system/system.queue.inc =================================================================== RCS file: modules/system/system.queue.inc diff -N modules/system/system.queue.inc --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ modules/system/system.queue.inc 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,221 @@ +addItem($item); +} + +/** + * Reserve an item in a queue for processing. + * + * @param $queue_name + * Arbitrary string. The name of the queue to work with. + * @param $process_time + * How long the processing is expected to take in seconds, defaults to an + * hour. After this time passes, the item will be reset and another process + * can reserve the item. + * @return + * A queue entry which is an object. The 'item' property contains the $item + * which is what was passed to system_queue_add_item(). This queue entry + * needs to be passed to system_queue_delete_item() once processing is + * completed. + */ +function system_queue_reserve_item($queue_name, $process_time = 3600) { + return _system_get_queue($queue_name)->reserveItem($process_time); +} + +/** + * Delete a finished entry from the queue. + * + * @param $entry + * The entry object returned by system_queue_reserve_item(). + */ +function system_queue_delete_item($entry) { + _system_get_queue($entry->queue_name)->deleteItem($entry->item_id); +} + +/** + * Create a queue. Called during installation and should be used to perform + * any necessary initialization operations. + * + * @param $queue_name + * Arbitrary string. The name of the queue to work with. + */ +function system_queue_create_queue($queue_name) { + _system_get_queue($queue_name)->createQueue(); +} + +/** + * Remove a queue and every item in the queue. + * + * @param $queue_name + * Arbitrary string. The name of the queue to work with. + */ +function system_queue_remove_queue($queue_name) { + _system_get_queue($queue_name)->removeQueue(); +} + +/** + * Default queue implementation. + * + * Do not use this class directly, use the system_queue_add_item(), + * system_queue_reserve_item(), system_queue_delete_item() and + * system_queue_remove_queue() functions. + */ +class systemQueue implements DrupalQueue { + protected $process_id; + + protected $queue_name; + + function __construct($queue_name) { + $this->queue_name = $queue_name; + } + + function addItem($item) { + $record->queue_name = $this->queue_name; + $record->item = $item; + $record->process_id = 0; + return drupal_write_record('queue', $record) !== FALSE; + } + + function reserveItem($process_time = 30) { + if (!isset($this->process_id)) { + $this->process_id = db_insert('queue_process_id')->useDefaults(array('process_id'))->execute(); + } + $start = time(); + $entry = FALSE; + $entry = db_query_range('SELECT item, item_id, queue_name FROM {queue} q WHERE process_id = 0 AND queue_name = :queue_name ORDER BY created ASC', array(':queue_name' => $this->queue_name), 0, 1)->fetchObject(); + if ($entry) { + // Try to mark the item as ours. + $update = db_update('queue') + ->fields(array('process_id' => $this->process_id, 'expire' => time() + $process_time)) + ->condition('item_id', $entry->item_id) + ->condition('process_id', 0); + // If there are affected rows, this update succeeded. + if ($update->execute()) { + $entry->item = unserialize($entry->item); + return $entry; + } + } + } + + function deleteItem($item_id) { + db_delete('queue')->condition('item_id', $item_id); + } + + function createQueue() { + // Our queues are created automatically on demand. + } + + function removeQueue() { + db_delete('queue')->condition('queue_name', $this->queue_name)->execute(); + } +} + +/** + * @} End of "defgroup queue". + */