? .git ? .gitignore ? 600584_batching.patch ? libraries/simplepie.inc Index: README.txt =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/README.txt,v retrieving revision 1.14 diff -u -p -r1.14 README.txt --- README.txt 10 Nov 2009 20:54:50 -0000 1.14 +++ README.txt 23 Nov 2009 03:16:52 -0000 @@ -121,6 +121,10 @@ Default: 200 Drupal Queue is enabled. http://drupal.org/project/drupal_queue +Name: feeds_node_batch_size +Default: 20 + The number of nodes feed node processor creates in one batch. + Glossary ======== Index: feeds.install =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.install,v retrieving revision 1.4 diff -u -p -r1.4 feeds.install --- feeds.install 20 Oct 2009 20:59:04 -0000 1.4 +++ feeds.install 23 Nov 2009 03:16:52 -0000 @@ -70,6 +70,12 @@ function feeds_schema() { 'not null' => TRUE, 'description' => t('Main source resource identifier. E. g. a path or a URL.'), ), + 'cache' => array( + 'type' => 'text', + 'not null' => FALSE, + 'description' => t('Flexible cache attached to source. Used e. g. for batching.'), + 'serialize' => TRUE, + ), ), 'primary key' => array('id', 'feed_nid'), 'indexes' => array( @@ -291,4 +297,21 @@ function feeds_update_6006() { db_add_index($ret, 'feeds_schedule', 'feed_nid', array('feed_nid')); return $ret; +} + +/** + * Add cache field to source table. + */ +function feeds_update_6007() { + $ret = array(); + + $spec = array( + 'type' => 'text', + 'not null' => FALSE, + 'description' => t('Flexible cache attached to source. Used e. g. for batching.'), + 'serialize' => TRUE, + ); + db_add_field($ret, 'feeds_source', 'cache', $spec); + + return $ret; } \ No newline at end of file Index: feeds.module =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.module,v retrieving revision 1.20 diff -u -p -r1.20 feeds.module --- feeds.module 16 Nov 2009 14:52:38 -0000 1.20 +++ feeds.module 23 Nov 2009 03:16:52 -0000 @@ -377,6 +377,31 @@ function feeds_scheduler_work($feed_info } /** + * Batch callback. + * + * @param $method + * Method to execute on importer; on of 'import' or 'clear' or 'expire'. + * @param $id + * Identifier of an importer object. + * @param $feed_nid + * If importer is attached to content type, feed node id identifying the + * source to be imported. + * @param $context + * Batch context. + */ +function feeds_batch($method, $id, $feed_nid = 0, &$context) { + switch ($method) { + case 'import': + $importer = feeds_importer($id); + $source = feeds_source($importer, $feed_nid); + $batch_status = $importer->import($source); + $context['finished'] = ($batch_status != FEEDS_BATCH_ACTIVE); + break; + } + +} + +/** * @} End of "defgroup hooks". */ Index: feeds.pages.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.pages.inc,v retrieving revision 1.5 diff -u -p -r1.5 feeds.pages.inc --- feeds.pages.inc 21 Oct 2009 23:18:30 -0000 1.5 +++ feeds.pages.inc 23 Nov 2009 03:16:52 -0000 @@ -87,7 +87,13 @@ function feeds_import_form_submit($form, // Refresh feed if import on create is selected. if ($importer->config['import_on_create']) { - $importer->import($source); + $batch = array( + 'title' => t('Importing'), + 'operations' => array( + array('feeds_batch', array('import', $importer->id, 0)), + ), + ); + batch_set($batch); } // Add importer to schedule. Index: includes/FeedsImporter.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/includes/FeedsImporter.inc,v retrieving revision 1.4 diff -u -p -r1.4 FeedsImporter.inc --- includes/FeedsImporter.inc 21 Oct 2009 22:49:47 -0000 1.4 +++ includes/FeedsImporter.inc 23 Nov 2009 03:16:52 -0000 @@ -10,6 +10,10 @@ require_once(dirname(__FILE__) .'/FeedsConfigurable.inc'); require_once(dirname(__FILE__) .'/FeedsSource.inc'); +// Batch status of a FeedsImporter operation. +define('FEEDS_BATCH_ACTIVE', 'active'); +define('FEEDS_BATCH_COMPLETE', 'complete'); + /** * A Feeds result class. * @@ -23,7 +27,7 @@ abstract class FeedsResult { // The type of this result. protected $type; // The value of this result. - protected $value; + public $value; /** * Constructor: create object, validate class variables. @@ -126,22 +130,39 @@ class FeedsImporter extends FeedsConfigu /** * Import a feed: execute, fetching, parsing and processing stage. * + * @todo: Add source-level locking to avoid race conditions between users / + * cron updating a source. + * @todo: Support batching. + * * @throws Exception * If a problem with fetching, parsing or processing occured. * @todo: Iron out and document potential Exceptions. - * @todo: Support batching. * @todo: catch exceptions outside of import(), clear() and expire(). + * + * @return + * FEEDS_BATCH_COMPLETE if all items have been imported, FEEDS_BATCH_ACTIVE + * if there are more to import. + * */ public function import(FeedsSource $source) { try { - $result = $this->fetcher->fetch($source); - $result = $this->parser->parse($result, $source); - $this->processor->process($result, $source); + // Only fetch and parse fresh data if there is no result in the batch + // cache. + if (!$result = $this->processor->cachedBatch($source)) { + $result = $this->fetcher->fetch($source); + $result = $this->parser->parse($result, $source); + } + + // Process result of parsing stage. The processor may support batching in + // which case process() may return FEEDS_BATCH_ACTIVE. + $batch_status = $this->processor->process($result, $source); } catch (Exception $e) { drupal_set_message($e->getMessage(), 'error'); } module_invoke_all('feeds_after_import', $this, $source); + + return $batch_status; } /** Index: includes/FeedsScheduler.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/includes/FeedsScheduler.inc,v retrieving revision 1.6 diff -u -p -r1.6 FeedsScheduler.inc --- includes/FeedsScheduler.inc 2 Nov 2009 20:40:22 -0000 1.6 +++ includes/FeedsScheduler.inc 23 Nov 2009 03:16:52 -0000 @@ -208,7 +208,14 @@ class FeedsScheduler implements IFeedsSc // There are 2 possible callbacks: expire or 'import'. if ($feed_info['callback'] == 'expire') { try { - $importer->expire(); + // @todo: support batching on cron by unflagging after importing. + // - unflag after import + // - set last_scheduled_time only if batch is complete. + // - will require to rename last_scheduled_time to last_import_time. + do { + $result = $importer->expire(); + } + while ($result != FEEDS_BATCH_COMPLETE); } catch (Exception $e) { watchdog('FeedsScheduler', $e->getMessage(), array(), WATCHDOG_ERROR); @@ -222,7 +229,11 @@ class FeedsScheduler implements IFeedsSc return; } try { - $importer->import($source); + // @todo: support batching on cron by unflagging after importing. + do { + $result = $importer->import($source); + } + while ($result != FEEDS_BATCH_COMPLETE); } catch (Exception $e) { watchdog('FeedsScheduler', $e->getMessage(), array(), WATCHDOG_ERROR); Index: includes/FeedsSource.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/includes/FeedsSource.inc,v retrieving revision 1.1 diff -u -p -r1.1 FeedsSource.inc --- includes/FeedsSource.inc 20 Oct 2009 21:01:35 -0000 1.1 +++ includes/FeedsSource.inc 23 Nov 2009 03:16:52 -0000 @@ -80,6 +80,10 @@ class FeedsSource extends FeedsConfigura // The FeedsImporter object that this source is expected to be used with. protected $importer; + // Cache source-related information between page loads. Used for batching. + // For an example, see FeedsImporter::import(). + protected $cache; + /** * Instantiate a unique object per class/id/feed_nid. Don't use * directly, use feeds_source() instead. @@ -126,6 +130,7 @@ class FeedsSource extends FeedsConfigura 'feed_nid' => $this->feed_nid, 'config' => $config, 'source' => $source, + 'cache' => $this->cache, ); // Make sure a source record is present at all time, try to update first, // then insert. @@ -139,13 +144,14 @@ class FeedsSource extends FeedsConfigura * Load configuration and unpack. */ public function load() { - if ($config = db_result(db_query('SELECT config FROM {feeds_source} WHERE id = "%s" AND feed_nid = %d', $this->id, $this->feed_nid))) { + if ($record = db_fetch_object(db_query('SELECT config, cache FROM {feeds_source} WHERE id = "%s" AND feed_nid = %d', $this->id, $this->feed_nid))) { // While FeedsSource cannot be exported, we still use CTool's export.inc // export definitions. // @todo: patch CTools to move constants from export.inc to ctools.module. ctools_include('export'); $this->export_type = EXPORT_IN_DATABASE; - $this->config = unserialize($config); + $this->config = unserialize($record->config); + $this->cache = unserialize($record->cache); } } @@ -158,6 +164,13 @@ class FeedsSource extends FeedsConfigura } /** + * Set cache at key to given value. + */ + public function setCache($key, $value) { + $this->cache[$key] = $value; + } + + /** * Convenience function. Returns the configuration for a specific class. * * @param FeedsSourceInterface $client Index: plugins/FeedsDataProcessor.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/plugins/FeedsDataProcessor.inc,v retrieving revision 1.4 diff -u -p -r1.4 FeedsDataProcessor.inc --- plugins/FeedsDataProcessor.inc 2 Nov 2009 20:18:35 -0000 1.4 +++ plugins/FeedsDataProcessor.inc 23 Nov 2009 03:16:52 -0000 @@ -50,6 +50,8 @@ class FeedsDataProcessor extends FeedsPr else { drupal_set_message(t('There are no new items.')); } + + return FEEDS_BATCH_COMPLETE; } /** @@ -63,6 +65,8 @@ class FeedsDataProcessor extends FeedsPr ); $num = $this->handler()->delete($clause); drupal_set_message(t('Deleted !number items.', array('!number' => $num))); + + return FEEDS_BATCH_COMPLETE; } /** @@ -83,6 +87,8 @@ class FeedsDataProcessor extends FeedsPr ); $num = $this->handler()->delete($clause); drupal_set_message(t('Expired !number records from !table.', array('!number' => $num, '!table' => $this->tableName()))); + + return FEEDS_BATCH_COMPLETE; } /** Index: plugins/FeedsFeedNodeProcessor.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/plugins/FeedsFeedNodeProcessor.inc,v retrieving revision 1.4 diff -u -p -r1.4 FeedsFeedNodeProcessor.inc --- plugins/FeedsFeedNodeProcessor.inc 18 Nov 2009 16:53:48 -0000 1.4 +++ plugins/FeedsFeedNodeProcessor.inc 23 Nov 2009 03:16:52 -0000 @@ -57,6 +57,8 @@ class FeedsFeedNodeProcessor extends Fee else { drupal_set_message(t('There is no new content.')); } + + return FEEDS_BATCH_COMPLETE; } /** Index: plugins/FeedsNodeProcessor.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/plugins/FeedsNodeProcessor.inc,v retrieving revision 1.15 diff -u -p -r1.15 FeedsNodeProcessor.inc --- plugins/FeedsNodeProcessor.inc 18 Nov 2009 16:53:48 -0000 1.15 +++ plugins/FeedsNodeProcessor.inc 23 Nov 2009 03:16:52 -0000 @@ -17,9 +17,13 @@ class FeedsNodeProcessor extends FeedsPr public function process(FeedsParserResult $parserResult, FeedsSource $source) { // Count number of created and updated nodes. - $created = $updated = 0; + $cache = $source->cache; + $created = empty($cache['created']) ? 0 : $cache['created']; + $updated = empty($cache['updated']) ? 0 : $cache['updated']; + + $count = 0; + foreach ($parserResult->value['items'] as $k => $item) { - foreach ($parserResult->value['items'] as $item) { // If the target item does not exist OR if update_existing is enabled, // map and save. if (!($nid = $this->existingItemId($item, $source)) || $this->config['update_existing']) { @@ -48,6 +52,19 @@ class FeedsNodeProcessor extends FeedsPr $created++; } } + + // Unset items that have been processed. + unset($parserResult->value['items'][$k]); + + // Return if batch size was reached. + $count++; + if ($count >= variable_get('feeds_node_batch_size', 20)) { + $source->setCache('created', $created); + $source->setCache('updated', $updated); + $source->setCache('parser_result', $parserResult); + $source->setCache('batch_status', FEEDS_BATCH_ACTIVE); + return FEEDS_BATCH_ACTIVE; + } } // Set messages. @@ -60,6 +77,14 @@ class FeedsNodeProcessor extends FeedsPr else { drupal_set_message(t('There is no new content.')); } + + // Reset cache. + $source->setCache('created', 0); + $source->setCache('updated', 0); + $source->setCache('parser_result', NULL); + $source->setCache('batch_status', FEEDS_BATCH_COMPLETE); + + return FEEDS_BATCH_COMPLETE; } /** @@ -83,6 +108,8 @@ class FeedsNodeProcessor extends FeedsPr else { drupal_set_message(t('There is no content to be deleted.')); } + + return FEEDS_BATCH_COMPLETE; } /** @@ -103,6 +130,15 @@ class FeedsNodeProcessor extends FeedsPr while ($node = db_fetch_object($result)) { _feeds_node_delete($node->nid); } + + return FEEDS_BATCH_COMPLETE; + } + + /** + * Return parser result currently being worked on. + */ + public function cachedBatch($source) { + return $source->cache['batch_status'] == FEEDS_BATCH_ACTIVE ? $source->cache['parser_result'] : NULL; } /** Index: plugins/FeedsProcessor.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/plugins/FeedsProcessor.inc,v retrieving revision 1.4 diff -u -p -r1.4 FeedsProcessor.inc --- plugins/FeedsProcessor.inc 31 Oct 2009 14:30:27 -0000 1.4 +++ plugins/FeedsProcessor.inc 23 Nov 2009 03:16:52 -0000 @@ -17,6 +17,10 @@ abstract class FeedsProcessor extends Fe * @param FeedsSource $source * Source information about this import. * + * @return + * FEEDS_BATCH_COMPLETE if all items have been processed, FEEDS_BATCH_ACTIVE + * if not. + * * @todo: Should it be execute()? */ public abstract function process(FeedsParserResult $parserResult, FeedsSource $source); @@ -32,6 +36,10 @@ abstract class FeedsProcessor extends Fe * processor's responsibility to store the feed_nid of an imported item in * the processing stage. * @todo: pass in feed_nid instead of source? + * + * @return + * FEEDS_BATCH_COMPLETE if all items have been cleared, FEEDS_BATCH_ACTIVE + * if not. */ public abstract function clear(FeedsSource $source); @@ -44,8 +52,27 @@ abstract class FeedsProcessor extends Fe * If implemented, all items produced by this configuration that are older * than FEEDS_REQUEST_TIME - $time * If $time === NULL processor should use internal configuration. + * + * @return + * FEEDS_BATCH_COMPLETE if all items have been expired, FEEDS_BATCH_ACTIVE + * if not. + */ + public function expire($time = NULL) { + return FEEDS_BATCH_COMPLETE; + } + + /** + * Return open cached parser results. + * + * @return + * FeedsParserResult if there is an open batch being imported, NULL + * otherwise. + * + * @see FeedsNodeProcessor::cachedBatch(). */ - public function expire($time = NULL) {} + public function cachedBatch(FeedsSource $source) { + return NULL; + } /** * Execute mapping on an item. Index: plugins/FeedsTermProcessor.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/plugins/FeedsTermProcessor.inc,v retrieving revision 1.2 diff -u -p -r1.2 FeedsTermProcessor.inc --- plugins/FeedsTermProcessor.inc 2 Nov 2009 20:48:52 -0000 1.2 +++ plugins/FeedsTermProcessor.inc 23 Nov 2009 03:16:52 -0000 @@ -69,6 +69,8 @@ class FeedsTermProcessor extends FeedsPr else { drupal_set_message(t('There are no new terms.')); } + + return FEEDS_BATCH_COMPLETE; } /** @@ -97,6 +99,8 @@ class FeedsTermProcessor extends FeedsPr else { drupal_set_message(t('No terms to be deleted.')); } + + return FEEDS_BATCH_COMPLETE; } /**