diff --git a/core/includes/common.inc b/core/includes/common.inc index bb14e95..f35095c 100644 --- a/core/includes/common.inc +++ b/core/includes/common.inc @@ -6719,7 +6719,7 @@ function drupal_get_filetransfer_info() { * @param string $name * The name of the queue to work with. * @param bool $reliable - * TRUE if the ordering of items and guaranteeing every item executes at + * (optional) TRUE if the ordering of items and guaranteeing every item executes at * least once is important, FALSE if scalability is the main concern. Defaults * to FALSE. * @@ -6729,18 +6729,7 @@ function drupal_get_filetransfer_info() { * @see Drupal\Core\Queue\QueueInterface */ function queue($name, $reliable = FALSE) { - static $queues; - if (!isset($queues[$name])) { - $class = variable_get('queue_class_' . $name, NULL); - if ($class && $reliable && in_array('Drupal\Core\Queue\ReliableQueueInterface', class_implements($class))) { - $class = variable_get('queue_default_reliable_class', 'Drupal\Core\Queue\System'); - } - elseif (!$class) { - $class = variable_get('queue_default_class', 'Drupal\Core\Queue\System'); - } - $queues[$name] = new $class($name); - } - return $queues[$name]; + return drupal_container()->get('queue')->get($name, $reliable); } /** diff --git a/core/includes/form.inc b/core/includes/form.inc index 2659d06..3cb15e8 100644 --- a/core/includes/form.inc +++ b/core/includes/form.inc @@ -6,8 +6,9 @@ */ use Drupal\Component\Utility\NestedArray; -use Drupal\Core\Utility\Color; +use Drupal\Core\Database\Database; use Drupal\Core\Template\Attribute; +use Drupal\Core\Utility\Color; /** * @defgroup forms Form builder functions @@ -5193,7 +5194,7 @@ function _batch_queue($batch_set) { $class = $batch_set['queue']['class']; if (!isset($queues[$class][$name])) { - $queues[$class][$name] = new $class($name); + $queues[$class][$name] = new $class($name, Database::getConnection()); } return $queues[$class][$name]; } diff --git a/core/lib/Drupal/Core/CoreBundle.php b/core/lib/Drupal/Core/CoreBundle.php index 691e6e7..f15de72 100644 --- a/core/lib/Drupal/Core/CoreBundle.php +++ b/core/lib/Drupal/Core/CoreBundle.php @@ -73,6 +73,14 @@ public function build(ContainerBuilder $container) { ->register('keyvalue.database', 'Drupal\Core\KeyValueStore\KeyValueDatabaseFactory') ->addArgument(new Reference('database')); + // Register the Queue factory. + $container + ->register('queue', 'Drupal\Core\Queue\QueueFactory') + ->addArgument(new Reference('service_container')); + $container + ->register('queue.database', 'Drupal\Core\Queue\QueueDatabaseFactory') + ->addArgument(new Reference('database')); + $container->register('path.alias_manager', 'Drupal\Core\Path\AliasManager') ->addArgument(new Reference('database')) ->addArgument(new Reference('keyvalue')); diff --git a/core/lib/Drupal/Core/Queue/Batch.php b/core/lib/Drupal/Core/Queue/Batch.php index a1de48a..56ec94f 100644 --- a/core/lib/Drupal/Core/Queue/Batch.php +++ b/core/lib/Drupal/Core/Queue/Batch.php @@ -18,7 +18,7 @@ * Stale items from failed batches are cleaned from the {queue} table on cron * using the 'created' date. */ -class Batch extends System { +class Batch extends DatabaseQueue { /** * Overrides Drupal\Core\Queue\System::claimItem(). diff --git a/core/lib/Drupal/Core/Queue/DatabaseQueue.php b/core/lib/Drupal/Core/Queue/DatabaseQueue.php new file mode 100644 index 0000000..9ba7717 --- /dev/null +++ b/core/lib/Drupal/Core/Queue/DatabaseQueue.php @@ -0,0 +1,140 @@ +name = $name; + $this->connection = $connection; + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::createItem(). + */ + public function createItem($data) { + $query = $this->connection->insert('queue') + ->fields(array( + 'name' => $this->name, + 'data' => serialize($data), + // We cannot rely on REQUEST_TIME because many items might be created + // by a single request which takes longer than 1 second. + 'created' => time(), + )); + return (bool) $query->execute(); + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::numberOfItems(). + */ + public function numberOfItems() { + return $this->connection->query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField(); + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::claimItem(). + */ + public function claimItem($lease_time = 30) { + // Claim an item by updating its expire fields. If claim is not successful + // another thread may have claimed the item in the meantime. Therefore loop + // until an item is successfully claimed or we are reasonably sure there + // are no unclaimed items left. + while (TRUE) { + $item = $this->connection->queryRange('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject(); + if ($item) { + // Try to update the item. Only one thread can succeed in UPDATEing the + // same row. We cannot rely on REQUEST_TIME because items might be + // claimed by a single consumer which runs longer than 1 second. If we + // continue to use REQUEST_TIME instead of the current time(), we steal + // time from the lease, and will tend to reset items before the lease + // should really expire. + $update = $this->connection->update('queue') + ->fields(array( + 'expire' => time() + $lease_time, + )) + ->condition('item_id', $item->item_id) + ->condition('expire', 0); + // If there are affected rows, this update succeeded. + if ($update->execute()) { + $item->data = unserialize($item->data); + return $item; + } + } + else { + // No items currently available to claim. + return FALSE; + } + } + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::releaseItem(). + */ + public function releaseItem($item) { + $update = $this->connection->update('queue') + ->fields(array( + 'expire' => 0, + )) + ->condition('item_id', $item->item_id); + return $update->execute(); + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::deleteItem(). + */ + public function deleteItem($item) { + $this->connection->delete('queue') + ->condition('item_id', $item->item_id) + ->execute(); + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::createQueue(). + */ + public function createQueue() { + // All tasks are stored in a single database table (which is created when + // Drupal is first installed) so there is nothing we need to do to create + // a new queue. + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::deleteQueue(). + */ + public function deleteQueue() { + $this->connection->delete('queue') + ->condition('name', $this->name) + ->execute(); + } +} diff --git a/core/lib/Drupal/Core/Queue/QueueDatabaseFactory.php b/core/lib/Drupal/Core/Queue/QueueDatabaseFactory.php new file mode 100644 index 0000000..15d62f6 --- /dev/null +++ b/core/lib/Drupal/Core/Queue/QueueDatabaseFactory.php @@ -0,0 +1,47 @@ +connection = $connection; + } + + /** + * Constructs a new queue object for a given name. + * + * @param string $name + * The name of the collection holding key and value pairs. + * @param \Drupal\Core\Database\Connection $connection + * The connection to run against. + * @return \Drupal\Core\QueueStore\DatabaseStorage + * A key/value store implementation for the given $collection. + */ + public function get($name) { + return new DatabaseQueue($name, $this->connection); + } +} diff --git a/core/lib/Drupal/Core/Queue/QueueFactory.php b/core/lib/Drupal/Core/Queue/QueueFactory.php new file mode 100644 index 0000000..893be34 --- /dev/null +++ b/core/lib/Drupal/Core/Queue/QueueFactory.php @@ -0,0 +1,70 @@ +container = $container; + } + + /** + * Constructs a new key/value store for a given collection name. + * + * @param string $name + * The name of the queue to work with. + * @param bool $reliable + * (optional) TRUE if the ordering of items and guaranteeing every item executes at + * least once is important, FALSE if scalability is the main concern. Defaults + * to FALSE. + * + * @return \Drupal\Core\QueueStore\QueueInterface + * A key/value store implementation for the given $collection. + */ + public function get($name, $reliable = FALSE) { + global $conf; + if (!isset($this->queues[$name])) { + if ($reliable && isset($conf['queue_reliable_service_' . $name])) { + $service_name = $conf['queue_reliable_service_' . $name]; + } + elseif (isset($conf['queue_service_' . $name])) { + $service_name = $conf['queue_service_' . $name]; + } + elseif (isset($conf['queue_default'])) { + $service_name = $conf['queue_default']; + } + else { + $service_name = 'queue.database'; + } + $this->queues[$name] = $this->container->get($service_name)->get($name); + } + return $this->queues[$name]; + } +} + diff --git a/core/lib/Drupal/Core/Queue/System.php b/core/lib/Drupal/Core/Queue/System.php deleted file mode 100644 index 71c5e7f..0000000 --- a/core/lib/Drupal/Core/Queue/System.php +++ /dev/null @@ -1,129 +0,0 @@ -name = $name; - } - - /** - * Implements Drupal\Core\Queue\QueueInterface::createItem(). - */ - public function createItem($data) { - // During a Drupal 6.x to 8.x update, drupal_get_schema() does not contain - // the queue table yet, so we cannot rely on drupal_write_record(). - $query = db_insert('queue') - ->fields(array( - 'name' => $this->name, - 'data' => serialize($data), - // We cannot rely on REQUEST_TIME because many items might be created - // by a single request which takes longer than 1 second. - 'created' => time(), - )); - return (bool) $query->execute(); - } - - /** - * Implements Drupal\Core\Queue\QueueInterface::numberOfItems(). - */ - public function numberOfItems() { - return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField(); - } - - /** - * Implements Drupal\Core\Queue\QueueInterface::claimItem(). - */ - public function claimItem($lease_time = 30) { - // Claim an item by updating its expire fields. If claim is not successful - // another thread may have claimed the item in the meantime. Therefore loop - // until an item is successfully claimed or we are reasonably sure there - // are no unclaimed items left. - while (TRUE) { - $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject(); - if ($item) { - // Try to update the item. Only one thread can succeed in UPDATEing the - // same row. We cannot rely on REQUEST_TIME because items might be - // claimed by a single consumer which runs longer than 1 second. If we - // continue to use REQUEST_TIME instead of the current time(), we steal - // time from the lease, and will tend to reset items before the lease - // should really expire. - $update = db_update('queue') - ->fields(array( - 'expire' => time() + $lease_time, - )) - ->condition('item_id', $item->item_id) - ->condition('expire', 0); - // If there are affected rows, this update succeeded. - if ($update->execute()) { - $item->data = unserialize($item->data); - return $item; - } - } - else { - // No items currently available to claim. - return FALSE; - } - } - } - - /** - * Implements Drupal\Core\Queue\QueueInterface::releaseItem(). - */ - public function releaseItem($item) { - $update = db_update('queue') - ->fields(array( - 'expire' => 0, - )) - ->condition('item_id', $item->item_id); - return $update->execute(); - } - - /** - * Implements Drupal\Core\Queue\QueueInterface::deleteItem(). - */ - public function deleteItem($item) { - db_delete('queue') - ->condition('item_id', $item->item_id) - ->execute(); - } - - /** - * Implements Drupal\Core\Queue\QueueInterface::createQueue(). - */ - public function createQueue() { - // All tasks are stored in a single database table (which is created when - // Drupal is first installed) so there is nothing we need to do to create - // a new queue. - } - - /** - * Implements Drupal\Core\Queue\QueueInterface::deleteQueue(). - */ - public function deleteQueue() { - db_delete('queue') - ->condition('name', $this->name) - ->execute(); - } -} diff --git a/core/modules/system/lib/Drupal/system/Tests/Queue/QueueTest.php b/core/modules/system/lib/Drupal/system/Tests/Queue/QueueTest.php index 5767587..fe38f35 100644 --- a/core/modules/system/lib/Drupal/system/Tests/Queue/QueueTest.php +++ b/core/modules/system/lib/Drupal/system/Tests/Queue/QueueTest.php @@ -7,14 +7,23 @@ namespace Drupal\system\Tests\Queue; +use Drupal\Core\Database\Database; +use Drupal\Core\Queue\DatabaseQueue; use Drupal\Core\Queue\Memory; -use Drupal\Core\Queue\System; -use Drupal\simpletest\WebTestBase; +use Drupal\simpletest\DrupalUnitTestBase; /** * Tests the basic queue functionality. */ -class QueueTest extends WebTestBase { +class QueueTest extends DrupalUnitTestBase { + + /** + * The modules to enable. + * + * @var array + */ + public static $modules = array('system'); + public static function getInfo() { return array( 'name' => 'Queue functionality', @@ -27,10 +36,11 @@ public static function getInfo() { * Tests the System queue. */ public function testSystemQueue() { + $this->installSchema('system', 'queue'); // Create two queues. - $queue1 = new System($this->randomName()); + $queue1 = new DatabaseQueue($this->randomName(), Database::getConnection()); $queue1->createQueue(); - $queue2 = new System($this->randomName()); + $queue2 = new DatabaseQueue($this->randomName(), Database::getConnection()); $queue2->createQueue(); $this->queueTest($queue1, $queue2);