From a0d1b7f5e153e298ed16225d12346d5f97fad4e2 Mon Sep 17 00:00:00 2001 From: PAC Date: Sun, 30 Jul 2023 15:33:30 +0200 Subject: [PATCH 1/5] Improve performances when using the --sync option. --- migrate_tools.install | 44 ++++++++ migrate_tools.module | 15 +-- migrate_tools.services.yml | 6 +- src/EventSubscriber/MigrationImportSync.php | 36 ++++--- src/MigrateTools.php | 108 ++++++++++++++++++++ 5 files changed, 188 insertions(+), 21 deletions(-) create mode 100644 migrate_tools.install diff --git a/migrate_tools.install b/migrate_tools.install new file mode 100644 index 0000000..87ad80a --- /dev/null +++ b/migrate_tools.install @@ -0,0 +1,44 @@ + 'Table storing SyncSourceIds entries for the --sync option.', + 'fields' => [ + 'migration_id' => [ + 'description' => 'The migration ID.', + 'type' => 'varchar', + 'length' => 255, + 'not null' => TRUE, + ], + 'source_ids' => [ + 'description' => 'Array of source IDs, in the same order as defined in \Drupal\migrate\Row::$sourceIds.', + 'type' => 'blob', + // "normal" size for "blob" is 16KB on MySQL, and 4GB or unlimited on + // other DBMS. That should more than enough for a single set of IDs. + // @see https://www.drupal.org/node/159605 + 'size' => 'normal', + 'not null' => TRUE, + 'serialize' => TRUE, + ], + ], + 'indexes' => [ + 'migration_id' => ['migration_id'], + ], + ]; + return $schema; +} + +/** + * Adds a table in the database dedicated to SyncSourceIds entries. + * + * @return void + */ +function migrate_tools_update_10000(): void { + $schema = migrate_tools_schema(); + foreach($schema as $tableName => $schemaDefinition) { + \Drupal::database()->schema()->createTable($tableName, $schemaDefinition); + } +} diff --git a/migrate_tools.module b/migrate_tools.module index 9e8efa0..64206d9 100644 --- a/migrate_tools.module +++ b/migrate_tools.module @@ -22,6 +22,7 @@ use Drupal\migrate_tools\Form\MigrationEditForm; use Drupal\migrate_tools\Form\MigrationGroupAddForm; use Drupal\migrate_tools\Form\MigrationGroupDeleteForm; use Drupal\migrate_tools\Form\MigrationGroupEditForm; +use Drupal\migrate_tools\MigrateTools; /** * Implements hook_entity_type_build(). @@ -54,7 +55,7 @@ function migrate_tools_entity_type_build(array &$entity_types): void { /** * Implements hook_menu_links_discovered_alter(). */ -function migrate_tools_menu_links_discovered_alter(&$links) { +function migrate_tools_menu_links_discovered_alter(&$links): void { if (\Drupal::moduleHandler()->moduleExists('migrate_plus')) { $links['migrate_tools.menu'] = [ 'title' => 'Migrations', @@ -81,19 +82,19 @@ function migrate_tools_migration_plugins_alter(array &$migrations): void { /** * Implements hook_migrate_prepare_row(). * + * @throws \Exception + * * @see \Drupal\migrate_tools\EventSubscriber\MigrationImportSync * @see \Drupal\migrate\Plugin\migrate\source\SourcePluginBase::next() */ -function migrate_tools_migrate_prepare_row(Row $row, MigrateSourceInterface $source, MigrationInterface $migration) { +function migrate_tools_migrate_prepare_row(Row $row, MigrateSourceInterface $source, MigrationInterface $migration): void { if (!empty($migration->syncSource)) { // Keep track of all source rows here, as SourcePluginBase::next() might // skip some rows, and we need them all to detect missing items in source to // delete in destination. - $source_id_values = \Drupal::state()->get('migrate_tools_sync', []); - - $source_id_values[] = $row->getSourceIdValues(); - - \Drupal::state()->set('migrate_tools_sync', $source_id_values); + /** @var MigrateTools $migrateTools */ + $migrateTools = \Drupal::service('migrate_tools.migrate_tools'); + $migrateTools->addToSyncSourceIds($migration->getPluginId(), $row->getSourceIdValues()); } } diff --git a/migrate_tools.services.yml b/migrate_tools.services.yml index 9aeebf4..e00705d 100644 --- a/migrate_tools.services.yml +++ b/migrate_tools.services.yml @@ -19,7 +19,7 @@ services: - { name: event_subscriber } arguments: - '@event_dispatcher' - - '@state' + - '@migrate_tools.migrate_tools' plugin.manager.migrate_shared_config: class: Drupal\migrate_tools\MigrateSharedConfigPluginManager @@ -28,3 +28,7 @@ services: migrate_tools.shared_config_include_handler: class: Drupal\migrate_tools\MigrateIncludeHandler arguments: ['@plugin.manager.migrate_shared_config'] + + migrate_tools.migrate_tools: + class: Drupal\migrate_tools\MigrateTools + arguments: ['@database'] diff --git a/src/EventSubscriber/MigrationImportSync.php b/src/EventSubscriber/MigrationImportSync.php index 5e40547..681c2d5 100644 --- a/src/EventSubscriber/MigrationImportSync.php +++ b/src/EventSubscriber/MigrationImportSync.php @@ -11,6 +11,7 @@ use Drupal\migrate\Event\MigrateRollbackEvent; use Drupal\migrate\Event\MigrateRowDeleteEvent; use Drupal\migrate\Plugin\MigrationInterface; use Drupal\migrate_plus\Event\MigrateEvents as MigratePlusEvents; +use Drupal\migrate_tools\MigrateTools; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\EventDispatcher\EventSubscriberInterface; @@ -20,26 +21,17 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface; class MigrationImportSync implements EventSubscriberInterface { protected EventDispatcherInterface $dispatcher; - - /** - * The state key/value store. - * - * @var \Drupal\Core\State\StateInterface - */ - protected StateInterface $state; + protected MigrateTools $migrateTools; /** * MigrationImportSync constructor. * * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $dispatcher * The event dispatcher. - * @param Drupal\Core\State\StateInterface $state - * The Key/Value Store to use for tracking synced source rows. */ - public function __construct(EventDispatcherInterface $dispatcher, StateInterface $state) { + public function __construct(EventDispatcherInterface $dispatcher, MigrateTools $migrateTools) { $this->dispatcher = $dispatcher; - $this->state = $state; - $this->state->set('migrate_tools_sync', []); + $this->migrateTools = $migrateTools; } /** @@ -48,6 +40,7 @@ class MigrationImportSync implements EventSubscriberInterface { public static function getSubscribedEvents(): array { $events = []; $events[MigrateEvents::PRE_IMPORT][] = ['sync']; + $events[MigrateEvents::POST_IMPORT][] = ['cleanSyncData']; return $events; } @@ -56,10 +49,15 @@ class MigrationImportSync implements EventSubscriberInterface { * * @param \Drupal\migrate\Event\MigrateImportEvent $event * The migration import event. + * + * @throws \Exception */ public function sync(MigrateImportEvent $event): void { $migration = $event->getMigration(); if (!empty($migration->syncSource)) { + $migrationId = $migration->getPluginId(); + // Clear Sync IDs for this migration before starting preparing rows. + $this->migrateTools->clearSyncSourceIds($migrationId); // Loop through the source to register existing source ids. // @see migrate_tools_migrate_prepare_row(). @@ -71,7 +69,7 @@ class MigrationImportSync implements EventSubscriberInterface { $source->next(); } - $source_id_values = $this->state->get('migrate_tools_sync', []); + $source_id_values = $this->migrateTools->getSyncSourceIds($migrationId); $id_map = $migration->getIdMap(); $id_map->rewind(); @@ -107,6 +105,18 @@ class MigrationImportSync implements EventSubscriberInterface { } } + /** + * Cleans Sync data after a migration is complete. + * + * @param \Drupal\migrate\Event\MigrateImportEvent $event + * The migration import event. + */ + public function cleanSyncData(MigrateImportEvent $event): void { + $migration = $event->getMigration(); + $migrationId = $migration->getPluginId(); + $this->migrateTools->clearSyncSourceIds($migrationId); + } + /** * Dispatches MigrateRowDeleteEvent event. * diff --git a/src/MigrateTools.php b/src/MigrateTools.php index d766f47..ab0ee96 100644 --- a/src/MigrateTools.php +++ b/src/MigrateTools.php @@ -4,6 +4,8 @@ declare(strict_types = 1); namespace Drupal\migrate_tools; +use Drupal\Core\Database\Connection; + /** * Utility functionality for use in migrate_tools. */ @@ -14,6 +16,24 @@ class MigrateTools { */ public const DEFAULT_ID_LIST_DELIMITER = ':'; + protected const MAX_BUFFERED_SYNC_SOURCE_IDS_ENTRIES = 1000; + protected array $bufferedSyncIdsEntries = []; + + /** + * Connection to the database. + */ + public Connection $connection; + + /** + * MigrateTools constructor. + * + * @param Connection $connection + * Connection to the database. + */ + public function __construct(Connection $connection) { + $this->connection = $connection; + } + /** * Build the list of specific source IDs to import. * @@ -37,4 +57,92 @@ class MigrateTools { return $id_list; } + /** + * Clears all SyncSourceIds entries from the database, for given migration. + * + * @param string $migrationId + * Migration ID. + */ + public function clearSyncSourceIds(string $migrationId): void + { + $query = $this->connection->delete('migrate_tools_sync_source_ids') + ->condition('migration_id', $migrationId); + $query->execute(); + } + + /** + * Adds a SyncSourceIds entry to the database, for given migration. + * + * @param string $migrationId + * Migration ID. + * @param array $sourceIds + * A set of SyncSourceIds. Gets serialized to retain its structure. + * + * @throws \Exception + */ + public function addToSyncSourceIds(string $migrationId, array $sourceIds): void + { + $this->bufferedSyncIdsEntries[] = [ + 'migration_id' => $migrationId, + // Serialize source IDs before saving them to retain their structure. + 'source_ids' => serialize($sourceIds), + ]; + if (count($this->bufferedSyncIdsEntries) >= static::MAX_BUFFERED_SYNC_SOURCE_IDS_ENTRIES) { + $this->flushSyncSourceIdsToDatabase(); + } + } + + /** + * Flushes any pending SyncSourceIds to the database. + * + * @throws \Exception + */ + protected function flushSyncSourceIdsToDatabase(): void { + if (empty($this->bufferedSyncIdsEntries)) { + // Nothing to flush, do nothing. + return; + } + + // Batch insert all buffered pending entries. + $query = $this->connection->insert('migrate_tools_sync_source_ids') + ->fields(['migration_id', 'source_ids']); + foreach($this->bufferedSyncIdsEntries as $entry) { + $query->values($entry); + } + $query->execute(); + + // Clear buffered pending entries. + $this->bufferedSyncIdsEntries = []; + } + + /** + * Returns all SyncSourceIds from the database, for given migration. + * + * @param string $migrationId + * Migration ID. + * + * @return array + * Ids, structured as they were inserted. + * + * @throws \Exception + */ + public function getSyncSourceIds(string $migrationId): array + { + // Ensure all data was flushed to database before retrieving all of them. + $this->flushSyncSourceIdsToDatabase(); + + // Retrieve all IDs. + $serializedSourceIds = $this->connection->query( + 'SELECT source_ids FROM {migrate_tools_sync_source_ids} WHERE migration_id = :mid', + [':mid' => $migrationId], + ) + ->fetchCol(); + + // Unserialize source IDs to restore their structure. + array_walk($serializedSourceIds, static function(&$entry) { + $entry = unserialize($entry); + }); + + return $serializedSourceIds; + } } -- GitLab From f310a323f13c5dd5c1f585fe41e194805c711f80 Mon Sep 17 00:00:00 2001 From: PAC Date: Sun, 30 Jul 2023 16:28:27 +0200 Subject: [PATCH 2/5] Set up database schema for PHPUnit test "Kernel\DrushTest". --- tests/src/Kernel/DrushTest.php | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/src/Kernel/DrushTest.php b/tests/src/Kernel/DrushTest.php index 38b3ad0..925d5e9 100644 --- a/tests/src/Kernel/DrushTest.php +++ b/tests/src/Kernel/DrushTest.php @@ -66,6 +66,7 @@ namespace Drupal\Tests\migrate_tools\Kernel { $this->installEntitySchema('taxonomy_term'); $this->installEntitySchema('user'); $this->installSchema('user', ['users_data']); + $this->installSchema('migrate_tools', ['migrate_tools_sync_source_ids']); $this->migrationPluginManager = $this->container->get('plugin.manager.migration'); // Handle Drush 10 vs Drush 11 differences. $logger_class = class_exists(DrushLoggerManager::class) ? DrushLoggerManager::class : LoggerInterface::class; -- GitLab From eceb0e2048bcbfce362830478e41fa2e9dbf49b2 Mon Sep 17 00:00:00 2001 From: PAC Date: Mon, 31 Jul 2023 11:24:56 +0200 Subject: [PATCH 3/5] Avoid duplicating Sync IDs during the actual migration phase, as they've already been saved to database during the PRE_IMPORT phase. --- migrate_tools.module | 9 +++-- src/EventSubscriber/MigrationImportSync.php | 8 ++++ src/MigrateTools.php | 42 +++++++++++++++++++++ 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/migrate_tools.module b/migrate_tools.module index 64206d9..98b5a51 100644 --- a/migrate_tools.module +++ b/migrate_tools.module @@ -88,13 +88,14 @@ function migrate_tools_migration_plugins_alter(array &$migrations): void { * @see \Drupal\migrate\Plugin\migrate\source\SourcePluginBase::next() */ function migrate_tools_migrate_prepare_row(Row $row, MigrateSourceInterface $source, MigrationInterface $migration): void { - - if (!empty($migration->syncSource)) { + /** @var MigrateTools $migrateTools */ + $migrateTools = \Drupal::service('migrate_tools.migrate_tools'); + // Act on migrations that have a Sync source, and that are currently in the + // phase of Syncing their IDs. + if (!empty($migration->syncSource) && $migrateTools->isMigrationSyncing($migration->getPluginId())) { // Keep track of all source rows here, as SourcePluginBase::next() might // skip some rows, and we need them all to detect missing items in source to // delete in destination. - /** @var MigrateTools $migrateTools */ - $migrateTools = \Drupal::service('migrate_tools.migrate_tools'); $migrateTools->addToSyncSourceIds($migration->getPluginId(), $row->getSourceIdValues()); } } diff --git a/src/EventSubscriber/MigrationImportSync.php b/src/EventSubscriber/MigrationImportSync.php index 681c2d5..5b513e2 100644 --- a/src/EventSubscriber/MigrationImportSync.php +++ b/src/EventSubscriber/MigrationImportSync.php @@ -58,6 +58,9 @@ class MigrationImportSync implements EventSubscriberInterface { $migrationId = $migration->getPluginId(); // Clear Sync IDs for this migration before starting preparing rows. $this->migrateTools->clearSyncSourceIds($migrationId); + // Activate the syncing state for this migration, so + // migrate_tools_migrate_prepare_row() can record all IDs. + $this->migrateTools->setMigrationSyncingState($migrationId, TRUE); // Loop through the source to register existing source ids. // @see migrate_tools_migrate_prepare_row(). @@ -69,6 +72,11 @@ class MigrationImportSync implements EventSubscriberInterface { $source->next(); } + // Deactivate the syncing state for this migration, so + // migrate_tools_migrate_prepare_row() does not record any further IDs + // during the actual migration process. + $this->migrateTools->setMigrationSyncingState($migrationId, FALSE); + $source_id_values = $this->migrateTools->getSyncSourceIds($migrationId); $id_map = $migration->getIdMap(); diff --git a/src/MigrateTools.php b/src/MigrateTools.php index ab0ee96..716ddef 100644 --- a/src/MigrateTools.php +++ b/src/MigrateTools.php @@ -16,9 +16,24 @@ class MigrateTools { */ public const DEFAULT_ID_LIST_DELIMITER = ':'; + /** + * Maximum number of source Ids to keep in memory before flushing them + * to database. + */ protected const MAX_BUFFERED_SYNC_SOURCE_IDS_ENTRIES = 1000; + + /** + * Sync Ids buffered in RAM before flushing them to database. + */ protected array $bufferedSyncIdsEntries = []; + /** + * Array keeping track of migrations being in the Syncing IDs phase. + * Structure: List of `string => bool` where the key is the migration ID and + * the value is a boolean indicating whether it is currently syncing. + */ + protected array $syncingMigrations = []; + /** * Connection to the database. */ @@ -145,4 +160,31 @@ class MigrateTools { return $serializedSourceIds; } + + /** + * Sets the syncing state of a migration. + * + * @param string $migrationId + * Migration ID. + * @param bool $isSyncing + * State to set. + */ + public function setMigrationSyncingState(string $migrationId, bool $isSyncing): void + { + $this->syncingMigrations[$migrationId] = $isSyncing; + } + + /** + * Returns the syncing state of a migration. + * + * @param string $migrationId + * Migration ID. + * + * @return bool + * Whether the migration is currently syncing its IDs or not. + */ + public function isMigrationSyncing(string $migrationId): bool + { + return $this->syncingMigrations[$migrationId] ?? FALSE; + } } -- GitLab From ddc63c9400031d4b8e7b28a491f922ebd0f4eb53 Mon Sep 17 00:00:00 2001 From: pacproduct <31311-pacproduct@users.noreply.drupalcode.org> Date: Fri, 4 Aug 2023 22:14:47 +0000 Subject: [PATCH 4/5] Remove unnecessary "@return". --- migrate_tools.install | 2 -- 1 file changed, 2 deletions(-) diff --git a/migrate_tools.install b/migrate_tools.install index 87ad80a..eee71e1 100644 --- a/migrate_tools.install +++ b/migrate_tools.install @@ -33,8 +33,6 @@ function migrate_tools_schema(): array { /** * Adds a table in the database dedicated to SyncSourceIds entries. - * - * @return void */ function migrate_tools_update_10000(): void { $schema = migrate_tools_schema(); -- GitLab From b93d30719dd0dd19a3ba58c68741417a433d0ef5 Mon Sep 17 00:00:00 2001 From: PAC Date: Sat, 30 Sep 2023 14:44:12 +0200 Subject: [PATCH 5/5] Add a primary key to the table in order to prevent warning related to the "READ-COMMITTED" MySQL transaction-isolation level. --- migrate_tools.install | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/migrate_tools.install b/migrate_tools.install index eee71e1..c8ae3ad 100644 --- a/migrate_tools.install +++ b/migrate_tools.install @@ -7,6 +7,11 @@ function migrate_tools_schema(): array { $schema['migrate_tools_sync_source_ids'] = [ 'description' => 'Table storing SyncSourceIds entries for the --sync option.', 'fields' => [ + 'id' => array( + 'description' => 'Primary Key: Unique ID.', + 'type' => 'serial', + 'not null' => TRUE, + ), 'migration_id' => [ 'description' => 'The migration ID.', 'type' => 'varchar', @@ -27,6 +32,7 @@ function migrate_tools_schema(): array { 'indexes' => [ 'migration_id' => ['migration_id'], ], + 'primary key' => ['id'], ]; return $schema; } -- GitLab