=== modified file 'modules/system/system.info' --- modules/system/system.info 2008-10-12 01:23:01 +0000 +++ modules/system/system.info 2009-04-24 23:02:17 +0000 @@ -6,6 +6,7 @@ version = VERSION core = 7.x files[] = system.module files[] = system.admin.inc +files[] = system.queue.inc files[] = image.gd.inc files[] = system.install required = TRUE === modified file 'modules/system/system.install' --- modules/system/system.install 2009-04-20 02:23:15 +0000 +++ modules/system/system.install 2009-04-25 13:48:49 +0000 @@ -1041,6 +1041,67 @@ function system_schema() { 'primary key' => array('mlid'), ); + $schema['queue'] = array( + 'description' => 'Stores items in queues.', + 'fields' => array( + 'item_id' => array( + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ), + 'queue_name' => array( + 'type' => 'varchar', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The queue name.', + ), + 'process_id' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The ID of the dequeuing process.', + ), + 'item' => array( + 'type' => 'text', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The item itself.', + ), + 'expire' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The time the item needs reset.', + ), + 'created' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'When the item needs reset.', + ), + ), + 'primary key' => array('item_id'), + 'indexes' => array( + 'process_queue' => array('process_id', 'queue_name', 'created'), + 'process_expire' => array('process_id', 'expire'), + ), + ); + + $schema['queue_process_id'] = array( + 'description' => 'Stores queue process IDs, used to auto-incrament the process ID so that a unique process ID is used.', + 'fields' => array( + 'process_id' => array( + 'type' => 'serial', + 'not null' => TRUE, + 'description' => 'Primary Key: Unique process ID used to make sure only one consumer gets one item.', + ), + ), + 'primary key' => array('process_id'), + ); + $schema['registry'] = array( 'description' => "Each record is a function, class, or interface name and the file it is in.", 'fields' => array( @@ -3244,6 +3305,76 @@ function system_update_7020() { } /** + * Add the queue tables. + */ +function system_update_7022() { + $schema['queue'] = array( + 'description' => 'Stores items in queues.', + 'fields' => array( + 'item_id' => array( + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ), + 'queue_name' => array( + 'type' => 'varchar', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The queue name.', + ), + 'process_id' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The ID of the dequeuing process.', + ), + 'item' => array( + 'type' => 'text', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The item itself.', + ), + 'expire' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The time the item needs reset.', + ), + 'created' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'When the item needs reset.', + ), + ), + 'primary key' => array('item_id'), + 'indexes' => array( + 'process_queue' => array('process_id', 'queue_name', 'created'), + 'process_expire' => array('process_id', 'expire'), + ), + ); + + $schema['queue_process_id'] = array( + 'description' => 'Stores queue process IDs, used to auto-incrament the process ID so that a unique process ID is used.', + 'fields' => array( + 'process_id' => array( + 'type' => 'serial', + 'not null' => TRUE, + 'description' => 'Primary Key: Unique process ID used to make sure only one consumer gets one item.', + ), + ), + 'primary key' => array('process_id'), + ); + db_create_table($ret, 'queue', $schema['queue']); + db_create_table($ret, 'queue_process_id', $schema['queue_process_id']); + + return $ret; +} + +/** * @} End of "defgroup updates-6.x-to-7.x" * The next series of updates should start at 8000. */ === modified file 'modules/system/system.module' --- modules/system/system.module 2009-04-22 09:45:02 +0000 +++ modules/system/system.module 2009-04-24 23:02:17 +0000 @@ -1577,6 +1577,10 @@ function system_cron() { foreach ($cache_tables as $table) { cache_clear_all(NULL, $table); } + + // Reset expired items in the default queue implementation table. If that's + // not used, no problems, the query will will be a no-op. + db_update('queue')->fields(array('process_id' => 0, 'expire' => 0))->condition('expire', time(), '<')->execute(); } /** === added file 'modules/system/system.queue.inc' --- modules/system/system.queue.inc 1970-01-01 00:00:00 +0000 +++ modules/system/system.queue.inc 2009-04-25 16:16:00 +0000 @@ -0,0 +1,219 @@ +addItem($item); +} + +/** + * Reserve an item in a queue. + * + * @param $queue_name + * Arbitrary string. The name of the queue to work with. + * @param $process_time + * How long the processing is expected to take in seconds, Defaults to an + * hour. After this time passes, the item will be reset and another process + * can reserve the item. + * @return + * A queue entry which is an object. The 'item' property contains the $item + * which is what was passed to system_queue_add_item(). This queue entry + * needs to be passed to system_queue_delete_item() once processing is + * completed. + */ +function system_queue_reserve_item($queue_name, $process_time = 3600) { + return _system_get_queue($queue_name)->reserveItem($process_time); +} + +/** + * Delete a finished entry from the queue. + * + * @param $entry + * The entry object returned by system_queue_reserve_item(). + */ +function system_queue_delete_item($entry) { + _system_get_queue($entry->queue_name)->deleteItem($entry->item_id); +} + +/** + * Create a queue. + * + * @param $queue_name + * Arbitrary string. The name of the queue to work with. + */ +function system_queue_create_queue($queue_name) { + _system_get_queue($queue_name)->createQueue(); +} + +/** + * Remove a queue and every item in the queue. + * + * @param $queue_name + * Arbitrary string. The name of the queue to work with. + */ +function system_queue_remove_queue($queue_name) { + _system_get_queue($queue_name)->removeQueue(); +} + +/** + * Default queue implementation. + * + * Do not use this class directly, use the system_queue_add_item(), + * system_queue_reserve_item(), system_queue_delete_item() and + * system_queue_remove_queue() functions. + */ +class systemQueue implements DrupalQueue { + protected $process_id; + + protected $queue_name; + + function __construct($queue_name) { + $this->queue_name = $queue_name; + } + + function addItem($item) { + $record->queue_name = $this->queue_name; + $record->item = $item; + $record->process_id = 0; + return drupal_write_record('queue', $record) !== FALSE; + } + + function reserveItem($process_time = 30) { + if (!isset($this->process_id)) { + $this->process_id = db_insert('queue_process_id')->useDefaults(array('process_id'))->execute(); + } + $start = time(); + $entry = FALSE; + $entry = db_query_range('SELECT item, item_id, queue_name FROM {queue} q WHERE process_id = 0 AND queue_name = :queue_name ORDER BY created ASC', array(':queue_name' => $this->queue_name), 0, 1)->fetchObject(); + if ($entry) { + // Try to mark the item as ours. + $update = db_update('queue') + ->fields(array('process_id' => $this->process_id, 'expire' => time() + $process_time)) + ->condition('item_id', $entry->item_id) + ->condition('process_id', 0); + // If there are affected rows, this update succeeded. + if ($update->execute()) { + $entry->item = unserialize($entry->item); + return $entry; + } + } + } + + function deleteItem($item_id) { + db_delete('queue')->condition('item_id', $item_id); + } + + function createQueue() { + // Our queues are created automatically on demand. + } + + function removeQueue() { + db_delete('queue')->condition('queue_name', $this->queue_name)->execute(); + } +} + +/** + * @} End of "defgroup queue". + */ === modified file 'modules/system/system.test' --- modules/system/system.test 2009-03-31 01:49:50 +0000 +++ modules/system/system.test 2009-04-25 00:11:30 +0000 @@ -878,7 +878,7 @@ class SystemThemeFunctionalTest extends 'node_admin_theme' => FALSE, ); $this->drupalPost('admin/build/themes', $edit, t('Save configuration')); - + $this->drupalGet('admin'); $this->assertRaw('themes/garland', t('Administration theme used on an administration page.')); @@ -887,7 +887,7 @@ class SystemThemeFunctionalTest extends // Reset to the default theme settings. $this->drupalPost('admin/build/themes', array(), t('Reset to defaults')); - + $this->drupalGet('admin'); $this->assertRaw('themes/garland', t('Site default theme used on administration page.')); @@ -895,3 +895,73 @@ class SystemThemeFunctionalTest extends $this->assertRaw('themes/garland', t('Site default theme used on the add content page.')); } } + + +class QueueTestCase extends DrupalWebTestCase { + function getInfo() { + return array( + 'name' => t('Queue functionality'), + 'description' => t('Queues and dequeues an item.'), + 'group' => t('System'), + ); + } + + function testQueue() { + // Create two queues for testing. + $queue1 = $this->randomName(); + $queue2 = $this->randomName(); + module_invoke('system', 'queue_create_queue', $queue1); + module_invoke('system', 'queue_create_queue', $queue2); + // Add four items. + $items = array(); + for ($i = 0; $i < 4; $i++) { + $items[] = array($this->randomName() => $this->randomName()); + } + module_invoke('system', 'queue_add_item', $queue1, $items[0]); + module_invoke('system', 'queue_add_item', $queue1, $items[1]); + // Retrieve two items from one queue. + $entry = module_invoke('system', 'queue_reserve_item', $queue1); + $entries[] = $entry; + $new_items[] = $entry->item; + $entry = module_invoke('system', 'queue_reserve_item', $queue1); + $entries[] = $entry; + $new_items[] = $entry->item; + // Two dequeued items should match the two items we queued. + $this->assertEqual($this->queueScore($items, $new_items), 2, t('Two items matched')); + // Add two more items. + module_invoke('system', 'queue_add_item', $queue1, $items[2]); + module_invoke('system', 'queue_add_item', $queue1, $items[3]); + $entry = module_invoke('system', 'queue_reserve_item', $queue1); + $entries[] = $entry; + $new_items[] = $entry->item; + $entry = module_invoke('system', 'queue_reserve_item', $queue1); + $entries[] = $entry; + $new_items[] = $entry->item; + // All dequeued items should match the items we queued exactly once, + // therefore the score must be exactly 4. + $this->assertEqual($this->queueScore($items, $new_items), 4, t('Four items matched')); + // There should not be equal dequeued items. Each item is equal to itself + // therefore the score must be exactly 4. + $this->assertEqual($this->queueScore($new_items, $new_items), 4, t('Four items matched')); + foreach ($entries as $entry) { + module_invoke('system', 'queue_delete_item', $entry); + } + $this->assertFalse(module_invoke('system', 'queue_reserve_item', $queue1), t('Queue 1 is empty')); + $this->assertFalse(module_invoke('system', 'queue_reserve_item', $queue2), t('Queue 2 is empty')); + } + + /** + * This function returns the number of equal items in two arrays. + */ + function queueScore($items, $new_items) { + $score = 0; + foreach ($items as $item) { + foreach ($new_items as $new_item) { + if ($item === $new_item) { + $score++; + } + } + } + return $score; + } +}