=== modified file 'includes/common.inc' --- includes/common.inc 2009-02-28 07:36:06 +0000 +++ includes/common.inc 2009-03-12 15:22:07 +0000 @@ -2980,6 +2980,7 @@ function _drupal_bootstrap_full() { require_once DRUPAL_ROOT . '/includes/form.inc'; require_once DRUPAL_ROOT . '/includes/mail.inc'; require_once DRUPAL_ROOT . '/includes/actions.inc'; + require_once DRUPAL_ROOT . '/includes/queue.inc'; // Set the Drupal custom error handler. set_error_handler('_drupal_error_handler'); set_exception_handler('_drupal_exception_handler'); === added file 'includes/queue.inc' --- includes/queue.inc 1970-01-01 00:00:00 +0000 +++ includes/queue.inc 2009-03-12 15:35:34 +0000 @@ -0,0 +1,95 @@ +queue($item); +} + + +/** + * Reserve an item in a queue. + * + * @param $queue_name + * Arbitrary string. The name of the queue to work with. + * @param $process_time + * How long the processing is expected to take in seconds, Defaults to an + * hour After this time passes, the item will be reset and another process + * can reserve the item. + * @return + * A queue entry which is an object. The 'item' property contains the $item + * which is what was passed to queue(). This queue entry needs to be passed + * to queue_reserve() once processing is completed. + */ +function queue_reserve($queue_name, $process_time = 3600) { + return _queue_get_queue($queue_name)->dequeue($process_time, $wait_time); +} + +/** + * Delete a finished entry from the queue. + * + * @param $entry + * The entry object returned by queue_reserve(). + */ +function queue_delete($entry) { + return _queue_get_queue($entry->queue_name)->finish($entry->item_id); +} === added directory 'modules/queue' === added file 'modules/queue/queue.info' --- modules/queue/queue.info 1970-01-01 00:00:00 +0000 +++ modules/queue/queue.info 2009-03-12 15:22:07 +0000 @@ -0,0 +1,9 @@ +; $Id$ + +name = Queue +description = Default queue implementation +package = Core +version = VERSION +core = 7.x +files[] = queue.module +files[] = queue.install === added file 'modules/queue/queue.install' --- modules/queue/queue.install 1970-01-01 00:00:00 +0000 +++ modules/queue/queue.install 2009-03-12 15:33:59 +0000 @@ -0,0 +1,66 @@ + 'Stores items in queues.', + 'fields' => array( + 'item_id' => array( + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ), + 'queue_name' => array( + 'type' => 'varchar', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The queue name.', + ), + 'process_id' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The ID of the dequeuing process.', + ), + 'item' => array( + 'type' => 'text', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The item itself.', + ), + 'expire' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The time the item needs reset.', + ), + ), + 'primary key' => array('item_id'), + 'indexes' => array( + 'process_queue' => array('process_id', 'queue_name'), + 'process_expire' => array('process_id', 'expire'), + ), + ); + + $schema['queue_process_id'] = array( + 'description' => 'Stores queue process IDs, used to auto-incrament the process ID so that a unique process ID is used.', + 'fields' => array( + 'process_id' => array( + 'type' => 'serial', + 'not null' => TRUE, + 'description' => 'Primary Key: Unique process ID used to make sure only one consumer gets one item.', + ), + ), + 'primary key' => array('process_id'), + ); + return $schema; +} \ No newline at end of file === added file 'modules/queue/queue.module' --- modules/queue/queue.module 1970-01-01 00:00:00 +0000 +++ modules/queue/queue.module 2009-03-12 15:36:11 +0000 @@ -0,0 +1,57 @@ +queue_name = $queue_name; + } + + function add($item) { + $record->queue_name = $this->queue_name; + $record->item = $item; + $record->process_id = 0; + return drupal_write_record('queue', $record); + } + + function reserve($process_time = 30) { + if (!isset($this->process_id)) { + $this->process_id = db_insert('queue_process_id')->useDefaults(array('process_id'))->execute(); + } + $start = time(); + $entry = FALSE; + $statement = db_select('queue', 'q'); + $statement->addField('q', 'item_id'); + $statement->condition('process_id', 0); + $statement->condition('queue_name', $this->queue_name); + $statement->range(0, 1); + $item_id = $statement->execute()->fetchField(); + if ($item_id) { + // Try to mark the item as ours. + db_update('queue')->fields(array('process_id' => $this->process_id, 'expire' => time() + $process_time))->condition('item_id', $item_id)->condition('process_id', 0)->execute(); + // Grab the entry if it's ours. + $entry = db_query('SELECT item, item_id, queue_name FROM {queue} WHERE process_id = :process_id AND item_id = :item_id', array('process_id' => $this->process_id, 'item_id' => $item_id))->fetchObject(); + } + if ($entry) { + $entry->item = unserialize($entry->item); + return $entry; + } + } + + function delete($item_id) { + db_delete('queue')->condition('item_id', $item_id); + } +} + +/** + * Reset expired items. + */ +function queue_cron() { + db_update('queue')->fields(array('process_id' => 0, 'expire' => 0))->condition('expire', time(), '<')->condition('process_id', 0, '>'); +}