cvs diff: Diffing modules/system Index: modules/system/system.info =================================================================== RCS file: /cvs/drupal/drupal/modules/system/system.info,v retrieving revision 1.11 diff -u -p -u -p -r1.11 system.info --- modules/system/system.info 12 Oct 2008 01:23:06 -0000 1.11 +++ modules/system/system.info 3 May 2009 08:47:14 -0000 @@ -6,6 +6,7 @@ version = VERSION core = 7.x files[] = system.module files[] = system.admin.inc +files[] = system.queue.inc files[] = image.gd.inc files[] = system.install required = TRUE Index: modules/system/system.install =================================================================== RCS file: /cvs/drupal/drupal/modules/system/system.install,v retrieving revision 1.319 diff -u -p -u -p -r1.319 system.install --- modules/system/system.install 30 Apr 2009 21:44:18 -0000 1.319 +++ modules/system/system.install 3 May 2009 08:47:17 -0000 @@ -1052,6 +1052,67 @@ function system_schema() { 'primary key' => array('mlid'), ); + $schema['queue'] = array( + 'description' => 'Stores items in queues.', + 'fields' => array( + 'item_id' => array( + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ), + 'queue_name' => array( + 'type' => 'varchar', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The queue name.', + ), + 'consumer_id' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The ID of the dequeuing consumer.', + ), + 'item' => array( + 'type' => 'text', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The item itself.', + ), + 'expire' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The time the item needs reset.', + ), + 'created' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'When the item needs reset.', + ), + ), + 'primary key' => array('item_id'), + 'indexes' => array( + 'consumer_queue' => array('consumer_id', 'queue_name', 'created'), + 'consumer_expire' => array('consumer_id', 'expire'), + ), + ); + + $schema['queue_consumer_id'] = array( + 'description' => 'Stores queue consumer IDs, used to auto-increment the consumer ID so that a unique consumer ID is used.', + 'fields' => array( + 'consumer_id' => array( + 'type' => 'serial', + 'not null' => TRUE, + 'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.', + ), + ), + 'primary key' => array('consumer_id'), + ); + $schema['registry'] = array( 'description' => "Each record is a function, class, or interface name and the file it is in.", 'fields' => array( @@ -3299,6 +3360,76 @@ function system_update_7021() { } /** + * Add the queue tables. + */ +function system_update_7022() { + $schema['queue'] = array( + 'description' => 'Stores items in queues.', + 'fields' => array( + 'item_id' => array( + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ), + 'queue_name' => array( + 'type' => 'varchar', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The queue name.', + ), + 'consumer_id' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The ID of the dequeuing consumer.', + ), + 'item' => array( + 'type' => 'text', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The item itself.', + ), + 'expire' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The time the item needs reset.', + ), + 'created' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'When the item needs reset.', + ), + ), + 'primary key' => array('item_id'), + 'indexes' => array( + 'consumer_queue' => array('consumer_id', 'queue_name', 'created'), + 'consumer_expire' => array('consumer_id', 'expire'), + ), + ); + + $schema['queue_consumer_id'] = array( + 'description' => 'Stores queue consumer IDs, used to auto-incrament the consumer ID so that a unique consumer ID is used.', + 'fields' => array( + 'consumer_id' => array( + 'type' => 'serial', + 'not null' => TRUE, + 'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.', + ), + ), + 'primary key' => array('consumer_id'), + ); + db_create_table($ret, 'queue', $schema['queue']); + db_create_table($ret, 'queue_consumer_id', $schema['queue_consumer_id']); + + return $ret; +} + +/** * @} End of "defgroup updates-6.x-to-7.x" * The next series of updates should start at 8000. */ Index: modules/system/system.module =================================================================== RCS file: /cvs/drupal/drupal/modules/system/system.module,v retrieving revision 1.689 diff -u -p -u -p -r1.689 system.module --- modules/system/system.module 30 Apr 2009 21:44:19 -0000 1.689 +++ modules/system/system.module 3 May 2009 08:47:19 -0000 @@ -1587,6 +1587,16 @@ function system_cron() { foreach ($cache_tables as $table) { cache_clear_all(NULL, $table); } + + // Reset expired items in the default queue implementation table. If that's + // not used, this will simply be a no-op. + db_update('queue') + ->fields(array( + 'consumer_id' => 0, + 'expire' => 0, + )) + ->condition('expire', REQUEST_TIME, '<') + ->execute(); } /** Index: modules/system/system.queue.inc =================================================================== RCS file: modules/system/system.queue.inc diff -N modules/system/system.queue.inc --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ modules/system/system.queue.inc 3 May 2009 08:47:19 -0000 @@ -0,0 +1,214 @@ +queue_name = $queue_name; + } + + function addItem($item) { + $record->queue_name = $this->queue_name; + $record->item = $item; + $record->consumer_id = 0; + return drupal_write_record('queue', $record); + } + + function numberOfItems() { + return db_query('SELECT COUNT(item_id) FROM {queue}')->fetchField(); + } + + function claimItem($lease_time = 30) { + if (!isset($this->consumer_id)) { + $this->consumer_id = db_insert('queue_consumer_id') + ->useDefaults(array('consumer_id')) + ->execute(); + } + $start = REQUEST_TIME; + while (TRUE) { + $entry = FALSE; + $entry = db_query_range('SELECT item, item_id, queue_name FROM {queue} q WHERE consumer_id = 0 AND queue_name = :queue_name ORDER BY created ASC', array(':queue_name' => $this->queue_name), 0, 1)->fetchObject(); + if ($entry) { + // Try to mark the item as ours. + $update = db_update('queue') + ->fields(array( + 'consumer_id' => $this->consumer_id, + 'expire' => REQUEST_TIME + $lease_time, + )) + ->condition('item_id', $entry->item_id) + ->condition('consumer_id', 0); + // If there are affected rows, this update succeeded. + if ($update->execute()) { + $entry->item = unserialize($entry->item); + return $entry; + } + } + else { + // No items currently available to claim. + return NULL; + } + } + } + + function deleteItem($item) { + db_delete('queue') + ->condition('item_id', $item->item_id) + ->execute(); + } + + function createQueue() { + // Our queues are created automatically on demand. + } + + function removeQueue() { + db_delete('queue') + ->condition('queue_name', $this->queue_name) + ->execute(); + } +} + +/** + * @} End of "defgroup queue". + */ Index: modules/system/system.test =================================================================== RCS file: /cvs/drupal/drupal/modules/system/system.test,v retrieving revision 1.40 diff -u -p -u -p -r1.40 system.test --- modules/system/system.test 31 Mar 2009 01:49:54 -0000 1.40 +++ modules/system/system.test 3 May 2009 08:47:19 -0000 @@ -878,7 +878,7 @@ class SystemThemeFunctionalTest extends 'node_admin_theme' => FALSE, ); $this->drupalPost('admin/build/themes', $edit, t('Save configuration')); - + $this->drupalGet('admin'); $this->assertRaw('themes/garland', t('Administration theme used on an administration page.')); @@ -887,7 +887,7 @@ class SystemThemeFunctionalTest extends // Reset to the default theme settings. $this->drupalPost('admin/build/themes', array(), t('Reset to defaults')); - + $this->drupalGet('admin'); $this->assertRaw('themes/garland', t('Site default theme used on administration page.')); @@ -895,3 +895,92 @@ class SystemThemeFunctionalTest extends $this->assertRaw('themes/garland', t('Site default theme used on the add content page.')); } } + + +/** + * Test the basic queue functionality. + */ +class QueueTestCase extends DrupalWebTestCase { + function getInfo() { + return array( + 'name' => t('Queue functionality'), + 'description' => t('Queues and dequeues a set of items to check the basic queue functionality.'), + 'group' => t('System'), + ); + } + + /** + * Queues and dequeues a set of items to check the basic queue functionality. + */ + function testQueue() { + // Create two queues. + DrupalQueue::get($queue1 = $this->randomName())->createQueue(); + DrupalQueue::get($queue2 = $this->randomName())->createQueue(); + + // Create four items. + $items = array(); + for ($i = 0; $i < 4; $i++) { + $items[] = array($this->randomName() => $this->randomName()); + } + + // Queue items 1 and 2 in the queue1. + DrupalQueue::get($queue1)->addItem($items[0]); + DrupalQueue::get($queue1)->addItem($items[1]); + + // Retrieve two items from queue1. + $entries = array(); + $new_items = array(); + + $entries[] = $entry = DrupalQueue::get($queue1)->claimItem(); + $new_items[] = $entry->item; + + $entries[] = $entry = DrupalQueue::get($queue1)->claimItem(); + $new_items[] = $entry->item; + + // First two dequeued items should match the first two items we queued. + $this->assertEqual($this->queueScore($items, $new_items), 2, t('Two items matched')); + + // Add two more items. + DrupalQueue::get($queue1)->addItem($items[2]); + DrupalQueue::get($queue1)->addItem($items[3]); + + $this->assertTrue(DrupalQueue::get($queue1)->numberOfItems(), t('Queue 1 is not empty after adding items.')); + + $entries[] = $entry = DrupalQueue::get($queue1)->claimItem(); + $new_items[] = $entry->item; + + $entries[] = $entry = DrupalQueue::get($queue1)->claimItem(); + $new_items[] = $entry->item; + + // All dequeued items should match the items we queued exactly once, + // therefore the score must be exactly 4. + $this->assertEqual($this->queueScore($items, $new_items), 4, t('Four items matched')); + + // There should be no duplicate items. + $this->assertEqual($this->queueScore($new_items, $new_items), 4, t('Four items matched')); + + // Delete all items from queue1. + foreach ($entries as $entry) { + DrupalQueue::get($queue1)->deleteItem($entry); + } + + // Check that both queues are empty. + $this->assertFalse(DrupalQueue::get($queue1)->numberOfItems(), t('Queue 1 is empty')); + $this->assertFalse(DrupalQueue::get($queue2)->numberOfItems(), t('Queue 2 is empty')); + } + + /** + * This function returns the number of equal items in two arrays. + */ + function queueScore($items, $new_items) { + $score = 0; + foreach ($items as $item) { + foreach ($new_items as $new_item) { + if ($item === $new_item) { + $score++; + } + } + } + return $score; + } +}