Note: klausi's original issue summary below -eosrei
Ever had to build a migration against live data? Me too! Source records are added, updated and even deleted. The migrate module currently only supports new and updated source records. Here is a patch to fix that! Plus, some other related things...

Usage
If you are using --update to update all destination objects and want to remove any un-updated objects (OP's original request):

drush migrate-import --all --update
drush migrate-rollback --all --needs-update

If you are using track_changes to update destination objects with changed source data and want to remove any unprocessed objects:

drush migrate-import --all --track-unprocessed
drush migrate-rollback --all --unprocessed

If you are using a highwater field to update destination objects with changed source data and want to remove any unprocessed objects:

drush migrate-import --all --track-unprocessed
drush migrate-rollback --all --unprocessed --do-not-reset-highwater 

Features
The following features are implemented:

  • Separated needs_update flag data from row import status, includes hook_update_7208().
  • Adds two row status values: STATUS_SKIPPED and STATUS_UNPROCESSED. Definitions: STATUS_IGNORED means the row has been ignored by a prepareRow() function, STATUS_SKIPPED means the row has been skipped because of the id list, highwater field, or tracked changes.
  • Displays skipped record statistics after import
    Processed 19299 (3 created, 4 updated, 0 failed, 0 ignored, 19292 skipped) in 24.6 sec (17/min) - done with 'Example'
    
  • Implement roll back of both unprocessed and needs-update objects.
  • New tests for track_changes, skipped records, and rollback unprocessed with 17 new assertions.

Adds a new option to drush migrate-import:

 --track-unprocessed                       Set the status of all mapped rows to 
                                           unprocessed. When the import process 
                                           is complete, any unprocessed records 
                                           can be considered missing from the   
                                           source.

Adds three new options to drush migrate-rollback:

 --do-not-reset-highwater                  Do not reset the highwater field     
                                           value after the roll back. Generally 
                                           used with partial roll backs.        
 --needs-update                            Only roll back objects marked        
                                           needs-update                         
 --unprocessed                             Only roll back objects with a status 
                                           of unprocessed.

Notes
I've tried to reduce the overall amount of changed lines, which is why I used migrate-rollback instead of a new drush command such as migrate-purge.

--needs-update and --unprocessed are not compatible with --idlist (obviously) during rollback. They will override anything set it using --idlist.

For sanity the syntax of the saveIDMapping() function has been changed to include separate import_stats and needs_update values. I've checked the following contrib modules: commerce_migrate, g2migrate, migrate_d2d, migrate_extras phpbb2drupal, TYPO3_migrate, and wordpress_migrate. Only the current dev of migrate_d2d uses the saveIDMapping() function, so a small patch is required for it. Additionally, variable defaults now come from the database defaults rather than function and database defaults. If a passed value is NULL, it will not be updated in the database. No more losing your $hash value to set a $rollback_action.

Original:

  public function saveIDMapping(stdClass $source_row, array $dest_ids,
      $needs_update = MigrateMap::STATUS_IMPORTED,
      $rollback_action = MigrateMap::ROLLBACK_DELETE, $hash = NULL)

New:

  public function saveIDMapping(stdClass $source_row, array $dest_ids,
      $import_status = NULL, $needs_update = NULL, $rollback_action = NULL,
      $hash = NULL)

I can submit this as multiple patches if it aids review. It was created in five commits against the migrate.git.

Per mikeryan's comment in #5, "This would absolutely have to be an optional, non-default action" There is no noticable change to any existing functionality. All new functionality is completely optional.

OP issue summary:

Use case: User migration from an old platform, during the staging phase of the project I don't want to rollback and re-import users all the time. Users can get deleted on the old platform in the meantime, so when I run migrate-import --update the already imported users that do not exist anymore should also get deleted.

There is a comment in #1222454-3: migrating users with --update fails that says that "in particular, any destination-side data not in the source data will be wiped on --update". But this does not work for me. The records in the migrate_map table are only set to needs_update=1, but the Drupal 7 users do not get deleted.

Is there any other command I must run to delete the users? Do I have to write a script myself that extracts the destids that are set to needs_update=1 and invoke user_delete_multiple()?

Support from Acquia helps fund testing for Drupal Acquia logo

Comments

klausi’s picture

The workaround in my migration class:

  /**
   * Remove users that were deleted in the source.
   */
  public function postImport() {
    // Get uids to be deleted. They are set to "needs_update=1" although the
    // whole migration has run through.
    $uids = db_select('migrate_map_myuser', 'm')
      ->fields('m', array('sourceid1', 'destid1'))
      ->condition('needs_update', 1)
      ->execute()
      ->fetchAllKeyed(0, 1);
    $delete = array();
    foreach ($uids as $source => $dest) {
      $exists = Database::getConnection('default', 'old_db')
        ->select('users', 'u')
        ->fields('u', array('uid'))
        ->condition('uid', $source)
        ->execute()
        ->fetchAll();
      if (empty($exists)) {
        $delete[] = $dest;
        $this->getMap()->delete(array($source));
      }
    }
    user_delete_multiple($delete);
  }

Performs pretty much ok (takes a few additional seconds at the end of the migration) on 180 obsolete users out of 23,000.

mikeryan’s picture

Category: bug » feature

This is not a supported feature - the line you quote was speaking of the fields within a migrated item, it did not mean that items disappearing from the source data would magically be deleted from the destination side.

nerdcore’s picture

This would be a pretty great feature. It follows naturally from the discussion of highwater marks and updating of records - keeping data consistent between source and destination...

Wherever this is done in the Migration class (whether postImport() or another new function), the class itself has reference to the map table for that migration and its source key from whichever table was used for source data.

The Migration class could either iterate though all records in postImport(), LEFT JOINing the map table to the source data, and if the source ID is NULL, delete the record from the destination.

OR the Migration class could keep track of all source records which already have destinations, then in postImport() simply compare this list of source IDs to the map table, deleting records which are in the map and not in the internal list.

I just tried to var_dump($this) in postImport() to see if I could discern the structure of the source/dest/map data in my Migration class but I was overwhelmed with data. It would be nice to not have to re-iterate all of the source IDs in postImport() if possible.

Steven Jones’s picture

So, we need importing and deleting for importing and migrating etc.


protected function beginProcess($newStatus) {
    parent::beginProcess($newStatus);

    // Now force an update process.
    if ($newStatus == MigrationBase::STATUS_IMPORTING) {
      $this->prepareUpdate();
    }
  }


  public function endProcess() {
    if ($this->status == MigrationBase::STATUS_IMPORTING) {

      $idlist = $this->getOption('idlist');
      if ($idlist) {
        // Make the IDs keys, to more easily identify them
        $idlist = array_flip(explode(',', $idlist));
      }

      $destids = array();
      $sourceids = array();

      // Delete records that we not imported this time around:
      // They are set to "needs_update=1" although the
      // whole migration has run through.
      $rows_to_delete = $this->map->getRowsNeedingUpdate(1000000);
      foreach ($rows_to_delete as $destination_key) {

        // Note that bulk rollback is only supported for single-column keys
        if (!empty($destination_key->destid1)) {
          $destids[] = $destination_key->destid1;
        }
      }
      if (!empty($destids)) {
        migrate_instrument_start('destination bulkRollback');
        $this->destination->bulkRollback($destids);
        migrate_instrument_stop('destination bulkRollback');
      }
      foreach ($destids as $id) {
        $this->map->deleteDestination(array($id));
      }
    }


    // Now call our parent.
    parent::endProcess();
  }

So you can do something like this, to force the update setting and to delete items after import. This has basically been ripped out of Migration::rollback()

mikeryan’s picture

Version: 7.x-2.2 » 7.x-2.x-dev

This would absolutely have to be an optional, non-default action - searching for missing records is an expensive action. I'm not sure it even belongs as part of the import step - I'd be more comfortable with it as a separate action (drush migrate-purge or somesuch).

nerdcore’s picture

I think having a flag that we could set on the custom Migration subclass would be ideal. It is definitely going to be expensive and should be off by default. But for my $0.02 I think something like:

$this->deleteWhenSourceDeleted = TRUE;

would be great.

I'm currently dealing with a case where migrations are being fired through cron and using a highwater mark to continuously update from new source data, but sometimes a source record gets deleted and we need to keep it in sync (not just add).

Dentorat’s picture

Steven, where did you put this? Thanks

Steven Jones’s picture

@penthehuman pop that code in your main Migration subclass.

neruda001’s picture

Hi,

in the case you have custom entity in Drupal and data from another mysql DB or from several tables in the same db that you want to import as the entity content.

The issue of deleting non-existent source record previously migrated made me thinking a little and then I opted to develop in postMigration() method the algorithm to do that.

Assuming that the query to get the data maybe a little bit complicated, so I want to describe it once in migration subclass.
For the MigrationSqlSource there was a non implemented method originalQuery() so it got me an error when trying to get the originalQuery object in the postImport() using $this->source->query(); so I had to implement a new method in sql.inc where this file reside in /plugins/sources of the migrate module ( ver 2.5 ). Discussion https://drupal.org/node/1799964.
So the new method in sql.inc called for example getQueryCopy() is like this:

public function getQueryCopy(){
return clone ($this->originalQuery);
}

and in the postImport method of the migration class now you can get the query object value without having a dangerous reference to it, this is because later you need to edit this object.

function postImport(){
  $entityType = $this->destination->getEntityType();
  $connection;
  $transaction=db_transaction();
  try{
  $connection = Database::getConnection();
  $not_exists_query = $this->source->getQueryCopy();
  $not_exists_query->condition('sourcekey1',map.sourceid1)
->condition('sourcekey2',map.sourceid2)
->condition('sourcekey3',map.sourceid3);
  

//Get the rows in the migrate mapping which are not present in source table.
  $rows_to_delete = $connection
          ->select('migrate_map_entity_name','map')
          ->fields('map',array('sourceid1','sourceid2','sourceid3'))  
          ->notExists($not_exists_query)
          ->execute()->fetchAll();
  
  $countDeleted=0;
  
  foreach($rows_to_delete as $delete_me){
    $map_row = $this->getMap()->getRowBySource(array($delete_me->sourceid1,$delete_me->sourceid2,$delete_me->sourceid3));
   
    $deletion = hook_entity_name_delete($map_row['destid1']); //DEL the Entity

    $this->getMap()->delete(array($delete_me->sourceid1,$delete_me->sourceid2,$delete_me->sourceid3));  // DEL the mapping record
      
    //be free to use more efficient way to check if the rows get really deleted. :)
    $checkMap = $this->getMap()->getRowByDestination(array($map_row['destid1'])); 
    $checkEntity = hook_entity_name_load($map_row['destid1']);

    $message = "sourcekey1: ".$delete_me->sourceid1."; sourcekey2: ".$delete_me->sourceid2.
                                    "; sourcekey3: ".$delete_me->sourceid3;
    if($checkMap){
        $message.=" - Cannot delete Map record.";
      }
      if ($checkEntity){
        $message.=" - Cannot delete Entity record.";
      }
      
    if($checkMap || $checkEntity){
      $transaction->rollback();
      drupal_set_message($entityType." Error deleting: ".$message,'error');
      watchdog($entityType." migration",$entityType." Error deleting ".$message,array(),WATCHDOG_ERROR);
    }else{
      drupal_set_message($entityType." DELETED; ".$message,'warning');
      $countDeleted++;
    }

    
  }
  $connection=null;
  if($countDeleted>0){
    drupal_set_message($entityType."Total deleted: ".$countDeleted,'warning');
  }
  
  }
    catch(Exception $ex){
      $transaction->rollback();
      $connection=null;
      drupal_set_message($entityType."Errore in postImport: ".$ex->getMessage(),'error');
      watchdog_exception('module_name', $ex, "Error in postImport ".$entityType.": ".$ex->getMessage());
    }
}

just run migrate import to delete non-existent source record from destination and mapping table of the Drupal db.

13rac1’s picture

Title: Delete non-existing source data on migrate-import --update » Delete destination objects when source data no longer exists
Assigned: Unassigned » 13rac1
Issue summary: View changes
Status: Active » Needs review
FileSize
43.87 KB

Rewrote original issue summary. See above. Comment below.

13rac1’s picture

Issue summary: View changes

Adding definitions above.

Here are sanitized results from my current project after resyncing the database, applying this patch, and running Drupal updates:

user@server /var/www/example $ drush migrate-import --all --track-unprocessed
Processed 19299 (3 created, 4 updated, 0 failed, 0 ignored, 19292 skipped) in 25.5 sec (16/min) - done with 'Creators' [completed]
Processed 119030 (0 created, 596 updated, 0 failed, 0 ignored, 118434 skipped) in 993.3 sec (36/min) - done with 'Works' [completed]
user@server /var/www/example $ drush migrate-rollback --all --unprocessed
Rolled back 486 unprocessed objects in 77.1 sec (378/min) - done with 'Works' [completed]
Rolled back 1 unprocessed objects in 5.4 sec (11/min) - done with 'Creators' [completed]
13rac1’s picture

Issue summary: View changes

Adding note.

13rac1’s picture

Issue summary: View changes
FileSize
38.96 KB

Simplified patch to remove unnecessary code optimization to reduce patch complexity.

13rac1’s picture

Title: Delete destination objects when source data no longer exists » Delete destination when source data is removed (with new tests)

Adjusting title for clarity.

joachim’s picture

Will this affect migrations that specifically expect to have source records go missing?

For example, I'm working with migrations where users upload new source files to add to a migration. In that case, the migration map table will find that it has records that no longer have matching data in the source. See #2223175: using Migrate UI wizard to add to existing source data / use same map tables and https://drupal.org/sandbox/joachim/2231649

13rac1’s picture

@joachim: Are you asking if this patch will break existing migrations? Of course not. This is an optional new feature. If that isn't your question, can you rephrase? Thanks

joachim’s picture

Yup, that was my question (well, and future migrations too!). And I'm glad to hear that, thanks! :)

13rac1’s picture

FileSize
38.96 KB

Updating patch for recent commits to 7.x-2.x-dev. The patch includes 5kb of tests.

165 passes, 0 fails, 0 exceptions, and 4 debug messages
13rac1’s picture

Title: Delete destination when source data is removed (with new tests) » Optionally delete destination when source data is removed (with tests)
FileSize
39.2 KB

Minor update to reset skipped item count when ignored item count is reset.

13rac1’s picture

NULL

mikeryan’s picture

Status: Needs review » Needs work

There's way too much going on in this patch, well beyond the original scope of providing a means to remove data no longer in the source.

13rac1’s picture

Ah ha! Thank you for responding. I'll break this into multiple dependent issues.

How should "skipped" rows be accounted for? Should they be combined with "ignored" rows? That could reduce the number of code changes.

The first related issue #2249191: needs_update in the migrate_map_% tables is named/used inconsistently

Status: Needs work » Needs review

Status: Needs review » Needs work

The last submitted patch, 19: migrate-rollback-unprocessed-1416672-19.patch, failed testing.

daniroyo’s picture

Hello!

¿In which state is this feature?

Is it possible to use it?

mikeryan’s picture

As the issue status says, the patch needs work (i.e., it needs first of all to be reduced to the specific issue without extraneous refactoring).

cristiroma’s picture

Using @klausi method in #1 to delete in postImport works if you do the import from command line in one shot. However, sometimes we import large amount of data in a cron task with something like: drush mi WebServiceData --limit="30 seconds", therefore postImport is executed every time, so it's not feasible.

I have written a custom drush command to remove orphans, which can be run independently on the actual migration.

@mikeryan - Would be useful to implement a founction in the MigrateMap/MigrateSQLMap to retrieve all records? Or is anyhthing I miss? Would be simpler than query migrate_map_migration table using db_select, to get the records. $migration->getMap()->getMappings()

Code looks something like this:

function hook_drush_command() {
  return array(
    'migrate-purge-orphans' => array(
      'description' => 'Check remote source for orphaned records (removed) and remove them locally',
      'arguments' => array(
        'migration-name' => 'Name of the migration to check',
      ),
      'required-arguments' => 1,
      'options' => array(
        'dry-run' => 'Do not delete, just report the records',
      ),
    )
  );
}

function drush_module_migrate_purge_orphans($migration_name) {
  // This is the source data wrapper which is able to get a sourceid1 and tell if it exists or not, see itemExists below.
  $ds = new SourceWebService();

  // https://www.drupal.org/node/1416672
  /** @var Migration $migration */
  $migration = Migration::getInstance(drupal_strtolower($migration_name));
  if (!$migration) {
    drush_set_error('No migration found:' . $migration_name);
    return;
  }
  if ($migration->getStatus() != MigrationBase::STATUS_IDLE) {
    drush_set_error('Migration not idle, waiting to finish. Status is: ' . $migration->getStatus());
    return;
  }
  $rows = db_select('migrate_map_' . $migration->getMachineName(), 'm')
    ->fields('m', array('sourceid1', 'destid1'))
    ->isNotNull('destid1')
    ->execute()
    ->fetchAllKeyed(0, 1);
  $count = count($rows);
  $dry_run = drush_get_option('dry-run');

  $delete_nids = array();
  $delete_src = array();
  foreach ($rows as $source_id => $nid) {
    if (!$ds->itemExists($source_id)) {
      $delete_src[] = $source_id;
      $delete_nids[] = $nid;
    }
  }
  if (!$dry_run) {
    node_delete_multiple($delete_nids);
    foreach($delete_src as $id) {
      $migration->getMap()->delete(array($id));
    }
  }
}
dshumaker’s picture

I have the same issue of a missing entry in the source indicates a "possible" entry/node to delete. I looked at several ways of doing it and what I landed on was creating a mysql table called "keep_list" which is a fresh (emptied after each import) table created specifically for the purpose of indicating which entries are in the XML source. I thought about re-purposing the "needs-update" column but any re-purposing is potentially problematic because it's not what the author had in mind for the field/column. Indeed I found that after initial creation of a node the needs_update flag was triggered because it was assumed drupal needed to put "true" values into the fields (which was not the case, in my case, so I forced needs_udpate to be 0 on initial creation).

Then after my import, in the postImport member I loop through the results of a query that indicates which entries were not visited and I set their status to 0 (unpublished).
Then later on I can go back and remove unpublished nodes based on other criteria if need be (via a new Migration class pass).

In light of Klaus' comment on his patch, ""I've tried to reduce the overall amount of changed lines, which is why I used migrate-rollback instead of a new drush command such as migrate-purge.", I don't believe we should re-purpose rollback just because we don't want to create a new command line parameter "--remove-missing-source" or write a new MigrationClass pass that is specifically designed for deletes alltogether. Indeed it seems like creating separate Migration classes is the best practice of sorts.

Basically, semantically, rollback does not mean delete and I don't think we should go that direction because it's un-intuitive. To me, rollback means undo changes and that does seem intuitive.

0.02 cents.

anrikun’s picture

I fully agree with dshumaker above (#28): rollback does not mean delete and I don't think we should go that direction because it's un-intuitive.

Gomez_in_the_South’s picture

As discussed above, the postImport will be run at the end of every batch. This usually happens if called via the Migrate GUI or running drush with a time limit (either via parameter or CLI max_execution_time set).
If drush approaches the PHP memory limit, then it will also batch the migration in order to free up memory. This is another scenario to be aware of.
Therefore a solution as posted in #27 may be more suitable if you need to reliably run a function only after the entire migration has completed.

Sample output from migration logs demonstrating this:

out: Memory usage is 435.22 MB (85% of limit 512 MB), resetting statics     [warning]
out: Memory usage is now 405.38 MB (79% of limit 512 MB), not enough        [warning]
out: reclaimed, starting new batch
out: Processed 8 (0 created, 8 updated, 0 failed, 0 ignored) in 21.2 sec     [status]
out: (23/min) - continuing with 'priceMigration'
out: Migration priceMigration processed 8 items, returned 0 errors               [success]
out: Processed 34 (0 created, 34 updated, 0 failed, 0 ignored) in 57.8 sec[completed]
out: (35/min) - done with 'priceMigration'
GeduR’s picture

Hi all, I've merged some of the previous code to a module (sandbox), any help is welcome.

Maybe after further exploration it could be merged with the main module.

https://www.drupal.org/sandbox/gedur/2873105

Hope it helps

cosmicdreams’s picture

Should this issue be assigned to migrate_tools? They already have a concept of unprocessed files that is derived from calculating how many records are coming in from the source vs how many records exist in the local Drupal site. No real "tracking" is happening there but perhaps this ticket will add that.

cosmicdreams’s picture

Also, it appears this change targets D7, should we target D8 first?