diff --git a/README.txt b/README.txt index d101bf8..8f761b9 100644 --- a/README.txt +++ b/README.txt @@ -128,6 +128,10 @@ Description: The table used by FeedsDataProcessor to store feed items. Usually a and the importer's id ($importer_id). This default table name can be overridden by defining a variable with the same name. +Name: feeds_node_batch_size +Default: 20 + The number of nodes feed node processor creates in one batch. + Glossary ======== diff --git a/feeds.install b/feeds.install index cfa165a..83dd510 100644 --- a/feeds.install +++ b/feeds.install @@ -70,6 +70,13 @@ function feeds_schema() { 'not null' => TRUE, 'description' => t('Main source resource identifier. E. g. a path or a URL.'), ), + 'batch' => array( + 'type' => 'text', + 'size' => 'big', + 'not null' => FALSE, + 'description' => t('Cache for batching.'), + 'serialize' => TRUE, + ), ), 'primary key' => array('id', 'feed_nid'), 'indexes' => array( @@ -316,4 +323,22 @@ function feeds_update_6007() { db_add_field($ret, 'feeds_node_item', 'hash', $spec); return $ret; +} + +/** + * Add feed field to source table. + */ +function feeds_update_6008() { + $ret = array(); + + $spec = array( + 'type' => 'text', + 'size' => 'big', + 'not null' => FALSE, + 'description' => t('Cache for batching.'), + 'serialize' => TRUE, + ); + db_add_field($ret, 'feeds_source', 'batch', $spec); + + return $ret; } \ No newline at end of file diff --git a/feeds.module b/feeds.module index af1a99f..ea226a0 100644 --- a/feeds.module +++ b/feeds.module @@ -291,7 +291,7 @@ function feeds_nodeapi(&$node, $op, $form) { // Refresh feed if import on create is selected and suppress_import is // not set. if ($op == 'insert' && feeds_importer($importer_id)->config['import_on_create'] && !isset($node->feeds['suppress_import'])) { - $source->import(); + feeds_batch_set(t('Importing'), 'import', $importer_id, $node->nid); } // Add import to scheduler. feeds_scheduler()->add($importer_id, 'import', $node->nid); @@ -378,6 +378,62 @@ function feeds_scheduler_work($feed_info) { */ /** + * @defgroup batch Batch functions. + */ + +/** + * Batch callback. + * + * @param $method + * Method to execute on importer; one of 'import', 'clear' or 'expire'. + * @param $importer_id + * Identifier of a FeedsImporter object. + * @param $feed_nid + * If importer is attached to content type, feed node id identifying the + * source to be imported. + * @param $context + * Batch context. + */ +function feeds_batch($method, $importer_id, $feed_nid = 0, &$context) { + switch ($method) { + case 'import': + $batch_status = feeds_source($importer_id, $feed_nid)->import(); + break; + case 'clear': + $batch_status = feeds_source($importer_id, $feed_nid)->clear(); + break; + } + $context['finished'] = ($batch_status != FEEDS_BATCH_ACTIVE); +} + +/** + * Batch helper. + * + * @param $title + * Title to show to user when executing batch. + * @param $method + * Method to execute on importer; one of 'import', 'clear' or 'expire'. + * @param $importer_id + * Identifier of a FeedsImporter object. + * @param $feed_nid + * If importer is attached to content type, feed node id identifying the + * source to be imported. + */ +function feeds_batch_set($title, $method, $importer_id, $feed_nid = 0) { + $batch = array( + 'title' => $title, + 'operations' => array( + array('feeds_batch', array($method, $importer_id, $feed_nid)), + ), + ); + batch_set($batch); +} + +/** + * @} End of "defgroup batch". + */ + +/** * @defgroup utility Utility functions * @{ */ diff --git a/feeds.pages.inc b/feeds.pages.inc index 415ad6f..94d3eb4 100644 --- a/feeds.pages.inc +++ b/feeds.pages.inc @@ -83,7 +83,7 @@ function feeds_import_form_submit($form, &$form_state) { // Refresh feed if import on create is selected. if ($source->importer->config['import_on_create']) { - $source->import(); + feeds_batch_set(t('Importing'), 'import', $form['#importer_id']); } // Add importer to schedule. @@ -107,7 +107,7 @@ function feeds_import_tab_form(&$form_state, $node) { * Submit handler for feeds_import_tab_form(). */ function feeds_import_tab_form_submit($form, $form_state) { - feeds_source($form['#importer_id'], $form['#feed_nid'])->import(); + feeds_batch_set(t('Importing'), 'import', $form['#importer_id'], $form['#feed_nid']); } /** @@ -134,5 +134,5 @@ function feeds_delete_tab_form(&$form_state, $importer_id, $node = NULL) { * Submit handler for feeds_delete_tab_form(). */ function feeds_delete_tab_form_submit($form, &$form_state) { - feeds_source($form['#importer_id'], empty($form['#feed_nid']) ? 0 : $form['#feed_nid'])->clear(); + feeds_batch_set(t('Deleting'), 'clear', $form['#importer_id'], empty($form['#feed_nid']) ? 0 : $form['#feed_nid']); } diff --git a/includes/FeedsFeed.inc b/includes/FeedsFeed.inc index c9f5f1d..4d47433 100644 --- a/includes/FeedsFeed.inc +++ b/includes/FeedsFeed.inc @@ -9,12 +9,13 @@ * @see FeedsSource class * @see FeedsFetcher class */ -class FeedsFeed { +class FeedsFeed extends FeedsBatch { + // Properties of this feed. protected $url; protected $file_path; protected $raw; - protected $items; + public $items; protected $link; /** @@ -26,6 +27,7 @@ class FeedsFeed { $this->url = $url; $this->file_path = $file_path; $this->items = array(); + parent::__construct(); } /** diff --git a/includes/FeedsImporter.inc b/includes/FeedsImporter.inc index b3428fb..5237873 100644 --- a/includes/FeedsImporter.inc +++ b/includes/FeedsImporter.inc @@ -11,6 +11,9 @@ require_once(dirname(__FILE__) .'/FeedsConfigurable.inc'); require_once(dirname(__FILE__) .'/FeedsSource.inc'); require_once(dirname(__FILE__) .'/FeedsFeed.inc'); +// Batch status of a FeedsImporter operation. +define('FEEDS_BATCH_ACTIVE', 'active'); + /** * Class defining an importer object. This is the main hub for Feeds module's * functionality. @@ -65,7 +68,7 @@ class FeedsImporter extends FeedsConfigurable { */ public function expire($time = NULL) { try { - $this->processor->expire($time); + return $this->processor->expire($time); } catch (Exception $e) { drupal_set_message($e->getMessage(), 'error'); diff --git a/includes/FeedsScheduler.inc b/includes/FeedsScheduler.inc index c1ea746..0916eed 100644 --- a/includes/FeedsScheduler.inc +++ b/includes/FeedsScheduler.inc @@ -208,7 +208,14 @@ class FeedsScheduler implements FeedsSchedulerInterface { // There are 2 possible callbacks: expire or 'import'. if ($feed_info['callback'] == 'expire') { try { - $importer->expire(); + // @todo: support batching on cron by unflagging after importing. + // - unflag after import + // - set last_scheduled_time only if batch is complete. + // - will require to rename last_scheduled_time to last_import_time. + do { + $result = $importer->expire(); + } + while ($result == FEEDS_BATCH_ACTIVE); } catch (Exception $e) { watchdog('FeedsScheduler', $e->getMessage(), array(), WATCHDOG_ERROR); @@ -222,7 +229,10 @@ class FeedsScheduler implements FeedsSchedulerInterface { return; } try { - $source->import(); + do { + $result = $source->import(); + } + while ($result == FEEDS_BATCH_ACTIVE); } catch (Exception $e) { watchdog('FeedsScheduler', $e->getMessage(), array(), WATCHDOG_ERROR); diff --git a/includes/FeedsSource.inc b/includes/FeedsSource.inc index 469dfce..165a4c6 100644 --- a/includes/FeedsSource.inc +++ b/includes/FeedsSource.inc @@ -42,6 +42,22 @@ interface FeedsSourceInterface { } /** + * A FeedsBatch object holds the state of an import or clear batch. Used in + * FeedsSource class. + */ +class FeedsBatch { + // Public counters for easier access. + public $created; + public $updated; + public $deleted; + public function __construct() { + $this->created = 0; + $this->updated = 0; + $this->deleted = 0; + } +} + +/** * This class encapsulates a source of a feed. It stores where the feed can be * found and how to import it. * @@ -75,6 +91,9 @@ class FeedsSource extends FeedsConfigurable { // The FeedsImporter object that this source is expected to be used with. protected $importer; + // A FeedsBatch object. NULL if there is no active batch. + protected $batch; + /** * Instantiate a unique object per class/id/feed_nid. Don't use * directly, use feeds_source() instead. @@ -107,10 +126,16 @@ class FeedsSource extends FeedsConfigurable { */ public function import() { try { - $feed = $this->importer->fetcher->fetch($this); - $this->importer->parser->parse($feed, $this); - $this->importer->processor->process($feed, $this); - unset($feed); + if (!$this->batch) { + $this->batch = $this->importer->fetcher->fetch($this); + $this->importer->parser->parse($this->batch, $this); + } + if (FEEDS_BATCH_ACTIVE == $this->importer->processor->process($this->batch, $this)) { + $this->save(); + return FEEDS_BATCH_ACTIVE; + } + unset($this->batch); + $this->save(); } catch (Exception $e) { drupal_set_message($e->getMessage(), 'error'); @@ -125,7 +150,15 @@ class FeedsSource extends FeedsConfigurable { try { $this->importer->fetcher->clear($this); $this->importer->parser->clear($this); - $this->importer->processor->clear($this); + if (!$this->batch) { + $this->batch = new FeedsBatch(); + } + if (FEEDS_BATCH_ACTIVE == $this->importer->processor->clear($this->batch, $this)) { + $this->save(); + return FEEDS_BATCH_ACTIVE; + } + unset($this->batch); + $this->save(); } catch (Exception $e) { drupal_set_message($e->getMessage(), 'error'); @@ -148,6 +181,7 @@ class FeedsSource extends FeedsConfigurable { 'feed_nid' => $this->feed_nid, 'config' => $config, 'source' => $source, + 'batch' => isset($this->batch) ? $this->batch : FALSE, ); // Make sure a source record is present at all time, try to update first, // then insert. @@ -161,13 +195,14 @@ class FeedsSource extends FeedsConfigurable { * Load configuration and unpack. */ public function load() { - if ($config = db_result(db_query('SELECT config FROM {feeds_source} WHERE id = "%s" AND feed_nid = %d', $this->id, $this->feed_nid))) { + if ($record = db_fetch_object(db_query('SELECT config, batch FROM {feeds_source} WHERE id = "%s" AND feed_nid = %d', $this->id, $this->feed_nid))) { // While FeedsSource cannot be exported, we still use CTool's export.inc // export definitions. // @todo: patch CTools to move constants from export.inc to ctools.module. ctools_include('export'); $this->export_type = EXPORT_IN_DATABASE; - $this->config = unserialize($config); + $this->config = unserialize($record->config); + $this->batch = unserialize($record->batch); } } diff --git a/plugins/FeedsDataProcessor.inc b/plugins/FeedsDataProcessor.inc index 6711ab8..8743510 100644 --- a/plugins/FeedsDataProcessor.inc +++ b/plugins/FeedsDataProcessor.inc @@ -57,7 +57,7 @@ class FeedsDataProcessor extends FeedsProcessor { * * Delete all data records for feed_nid in this table. */ - public function clear(FeedsSource $source) { + public function clear(FeedsBatch $batch, FeedsSource $source) { $clause = array( 'feed_nid' => $source->feed_nid, ); diff --git a/plugins/FeedsFeedNodeProcessor.inc b/plugins/FeedsFeedNodeProcessor.inc index a7317e3..477ca48 100644 --- a/plugins/FeedsFeedNodeProcessor.inc +++ b/plugins/FeedsFeedNodeProcessor.inc @@ -62,7 +62,7 @@ class FeedsFeedNodeProcessor extends FeedsProcessor { /** * Implementation of FeedsProcessor::clear(). */ - public function clear(FeedsSource $source) { + public function clear(FeedsBatch $batch, FeedsSource $source) { // Do not support deleting imported items as we would have to delete all // items of the content type we imported which may contain nodes that a // user created by hand. diff --git a/plugins/FeedsNodeProcessor.inc b/plugins/FeedsNodeProcessor.inc index 6c583ca..04d8df0 100644 --- a/plugins/FeedsNodeProcessor.inc +++ b/plugins/FeedsNodeProcessor.inc @@ -17,7 +17,7 @@ class FeedsNodeProcessor extends FeedsProcessor { public function process(FeedsFeed $feed, FeedsSource $source) { // Count number of created and updated nodes. - $created = $updated = 0; + $processed = 0; while ($item = $feed->shiftItem()) { @@ -38,10 +38,10 @@ class FeedsNodeProcessor extends FeedsProcessor { // If updating populate nid and vid avoiding an expensive node_load(). $node->nid = $nid; $node->vid = db_result(db_query('SELECT vid FROM {node} WHERE nid = %d', $nid)); - $updated++; + $feed->updated++; } else { - $created++; + $feed->created++; } // Populate and prepare node object. @@ -68,14 +68,21 @@ class FeedsNodeProcessor extends FeedsProcessor { // Save the node. node_save($node); } + + // Return FEEDS_BATCH_ACTIVE batch size is reached and items are not + // completely consumed yet. + $processed++; + if ($processed > variable_get('feeds_node_batch_size', 1)) { + return FEEDS_BATCH_ACTIVE; + } } // Set messages. - if ($created) { - drupal_set_message(t('Created !number !type nodes.', array('!number' => $created, '!type' => node_get_types('name', $this->config['content_type'])))); + if ($feed->created) { + drupal_set_message(t('Created !number !type nodes.', array('!number' => $feed->created, '!type' => node_get_types('name', $this->config['content_type'])))); } - elseif ($updated) { - drupal_set_message(t('Updated !number !type nodes.', array('!number' => $updated, '!type' => node_get_types('name', $this->config['content_type'])))); + elseif ($feed->updated) { + drupal_set_message(t('Updated !number !type nodes.', array('!number' => $feed->updated, '!type' => node_get_types('name', $this->config['content_type'])))); } else { drupal_set_message(t('There is no new content.')); @@ -84,21 +91,22 @@ class FeedsNodeProcessor extends FeedsProcessor { /** * Implementation of FeedsProcessor::clear(). - * @todo: use batch API. */ - public function clear(FeedsSource $source) { - // Count number of deleted nodes. - $deleted = 0; - - $result = db_query('SELECT nid FROM {feeds_node_item} WHERE feed_nid = %d', $source->feed_nid); + public function clear(FeedsBatch $batch, FeedsSource $source) { + $result = db_query_range('SELECT nid FROM {feeds_node_item} WHERE feed_nid = %d', $source->feed_nid, 0, variable_get('feeds_node_batch_size', 1)); while ($node = db_fetch_object($result)) { _feeds_node_delete($node->nid); - $deleted++; + $batch->deleted++; + } + + // If there is still content to be deleted return FALSE. + if (db_result(db_query_range('SELECT nid FROM {feeds_node_item} WHERE feed_nid = %d', $source->feed_nid, 0, 1))) { + return FEEDS_BATCH_ACTIVE; } // Set message. - if ($deleted) { - drupal_set_message(t('Deleted !number nodes.', array('!number' => $deleted))); + if ($batch->deleted) { + drupal_set_message(t('Deleted !number nodes.', array('!number' => $batch->deleted))); } else { drupal_set_message(t('There is no content to be deleted.')); diff --git a/plugins/FeedsProcessor.inc b/plugins/FeedsProcessor.inc index 202b488..3aae33c 100644 --- a/plugins/FeedsProcessor.inc +++ b/plugins/FeedsProcessor.inc @@ -25,6 +25,8 @@ abstract class FeedsProcessor extends FeedsPlugin { * Remove all stored results or stored results up to a certain time for this * configuration/this source. * + * @param FeedsBatch $batch + * A FeedsBatch object for keeping information between page loads. * @param FeedsSource $source * Source information for this expiry. Implementers should only delete items * pertaining to this source. The preferred way of determining whether an @@ -32,8 +34,11 @@ abstract class FeedsProcessor extends FeedsPlugin { * processor's responsibility to store the feed_nid of an imported item in * the processing stage. * @todo: pass in feed_nid instead of source? + * + * @return + * NULL if all items have been cleared, FEEDS_BATCH_ACTIVE if not. */ - public abstract function clear(FeedsSource $source); + public abstract function clear(FeedsBatch $batch, FeedsSource $source); /** * Delete feed items younger than now - $time. @@ -44,8 +49,12 @@ abstract class FeedsProcessor extends FeedsPlugin { * If implemented, all items produced by this configuration that are older * than FEEDS_REQUEST_TIME - $time * If $time === NULL processor should use internal configuration. + * + * @return + * NULL if all items have been expired, FEEDS_BATCH_ACTIVE if not. */ - public function expire($time = NULL) {} + public function expire($time = NULL) { + } /** * Execute mapping on an item. diff --git a/plugins/FeedsTermProcessor.inc b/plugins/FeedsTermProcessor.inc index 85f9021..d9cf29f 100644 --- a/plugins/FeedsTermProcessor.inc +++ b/plugins/FeedsTermProcessor.inc @@ -74,11 +74,13 @@ class FeedsTermProcessor extends FeedsProcessor { /** * Implement clear. * - * @param $source + * @param FeedsBatch $batch + * A FeedsBatch object for keeping information between page loads. + * @param FeedsSource $source * FeedsSource of this term. FeedsTermProcessor does not heed this * parameter, it deletes all terms from a vocabulary. */ - public function clear(FeedsSource $source) { + public function clear(FeedsBatch $batch, FeedsSource $source) { $deleted = 0; $result = db_query('SELECT tid FROM {term_data} WHERE vid = %d', $this->config['vocabulary']); diff --git a/plugins/FeedsUserProcessor.inc b/plugins/FeedsUserProcessor.inc index 4fc7553..2c5559b 100644 --- a/plugins/FeedsUserProcessor.inc +++ b/plugins/FeedsUserProcessor.inc @@ -68,11 +68,13 @@ class FeedsUserProcessor extends FeedsProcessor { /** * Implement clear. * - * @param $source + * @param FeedsBatch $batch + * A FeedsBatch object for keeping information between page loads. + * @param FeedsSource $source * FeedsSource of this term. FeedsTermProcessor does not heed this * parameter, it deletes all terms from a vocabulary. */ - public function clear(FeedsSource $source) { + public function clear(FeedsBatch $batch, FeedsSource $source) { // Do not support deleting users as we have no way of knowing which ones we // imported. throw new Exception(t('User processor does not support deleting users.'));