3 namespace Drupal\migrate\Plugin\migrate\id_map;
5 use Drupal\Component\Utility\Unicode;
6 use Drupal\Core\Field\BaseFieldDefinition;
7 use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
8 use Drupal\Core\Plugin\PluginBase;
9 use Drupal\migrate\MigrateMessage;
10 use Drupal\migrate\Plugin\MigrationInterface;
11 use Drupal\migrate\Event\MigrateIdMapMessageEvent;
12 use Drupal\migrate\MigrateException;
13 use Drupal\migrate\MigrateMessageInterface;
14 use Drupal\migrate\Plugin\MigrateIdMapInterface;
15 use Drupal\migrate\Row;
16 use Drupal\migrate\Event\MigrateEvents;
17 use Drupal\migrate\Event\MigrateMapSaveEvent;
18 use Drupal\migrate\Event\MigrateMapDeleteEvent;
19 use Symfony\Component\DependencyInjection\ContainerInterface;
20 use Symfony\Component\EventDispatcher\EventDispatcherInterface;
23 * Defines the sql based ID map implementation.
25 * It creates one map and one message table per migration entity to store the
26 * relevant information.
30 class Sql extends PluginBase implements MigrateIdMapInterface, ContainerFactoryPluginInterface {
33 * Column name of hashed source id values.
35 const SOURCE_IDS_HASH = 'source_ids_hash';
38 * An event dispatcher instance to use for map events.
40 * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
42 protected $eventDispatcher;
45 * The migration map table name.
49 protected $mapTableName;
52 * The message table name.
56 protected $messageTableName;
59 * The migrate message service.
61 * @var \Drupal\migrate\MigrateMessageInterface
66 * The database connection for the map/message tables on the destination.
68 * @var \Drupal\Core\Database\Connection
75 * @var \Drupal\Core\Database\Query\SelectInterface
80 * The migration being done.
82 * @var \Drupal\migrate\Plugin\MigrationInterface
87 * The source ID fields.
91 protected $sourceIdFields;
94 * The destination ID fields.
98 protected $destinationIdFields;
101 * Whether the plugin is already initialized.
105 protected $initialized;
112 protected $result = NULL;
115 * The source identifiers.
119 protected $sourceIds = [];
122 * The destination identifiers.
126 protected $destinationIds = [];
133 protected $currentRow = NULL;
140 protected $currentKey = [];
143 * Constructs an SQL object.
145 * Sets up the tables and builds the maps,
147 * @param array $configuration
149 * @param string $plugin_id
150 * The plugin ID for the migration process to do.
151 * @param mixed $plugin_definition
152 * The configuration for the plugin.
153 * @param \Drupal\migrate\Plugin\MigrationInterface $migration
154 * The migration to do.
156 public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, EventDispatcherInterface $event_dispatcher) {
157 parent::__construct($configuration, $plugin_id, $plugin_definition);
158 $this->migration = $migration;
159 $this->eventDispatcher = $event_dispatcher;
160 $this->message = new MigrateMessage();
166 public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
172 $container->get('event_dispatcher')
177 * Retrieves the hash of the source identifier values.
181 * @param array $source_id_values
182 * The source identifiers
185 * An hash containing the hashed values of the source identifiers.
187 public function getSourceIDsHash(array $source_id_values) {
188 // When looking up the destination ID we require an array with both the
189 // source key and value, e.g. ['nid' => 41]. In this case, $source_id_values
190 // need to be ordered the same order as $this->sourceIdFields().
191 // However, the Migration process plugin doesn't currently have a way to get
192 // the source key so we presume the values have been passed through in the
194 if (!isset($source_id_values[0])) {
195 $source_id_values_keyed = [];
196 foreach ($this->sourceIdFields() as $field_name => $source_id) {
197 $source_id_values_keyed[] = $source_id_values[$field_name];
199 $source_id_values = $source_id_values_keyed;
201 return hash('sha256', serialize(array_map('strval', $source_id_values)));
205 * The source ID fields.
208 * The source ID fields.
210 protected function sourceIdFields() {
211 if (!isset($this->sourceIdFields)) {
212 // Build the source and destination identifier maps.
213 $this->sourceIdFields = [];
215 foreach ($this->migration->getSourcePlugin()->getIds() as $field => $schema) {
216 $this->sourceIdFields[$field] = 'sourceid' . $count++;
219 return $this->sourceIdFields;
223 * The destination ID fields.
226 * The destination ID fields.
228 protected function destinationIdFields() {
229 if (!isset($this->destinationIdFields)) {
230 $this->destinationIdFields = [];
232 foreach ($this->migration->getDestinationPlugin()->getIds() as $field => $schema) {
233 $this->destinationIdFields[$field] = 'destid' . $count++;
236 return $this->destinationIdFields;
240 * The name of the database map table.
243 * The map table name.
245 public function mapTableName() {
247 return $this->mapTableName;
251 * The name of the database message table.
254 * The message table name.
256 public function messageTableName() {
258 return $this->messageTableName;
262 * Get the fully qualified map table name.
265 * The fully qualified map table name.
267 public function getQualifiedMapTableName() {
268 return $this->getDatabase()->getFullQualifiedTableName($this->mapTableName);
272 * Gets the database connection.
274 * @return \Drupal\Core\Database\Connection
275 * The database connection object.
277 public function getDatabase() {
278 if (!isset($this->database)) {
279 $this->database = \Drupal::database();
282 return $this->database;
286 * Initialize the plugin.
288 protected function init() {
289 if (!$this->initialized) {
290 $this->initialized = TRUE;
291 // Default generated table names, limited to 63 characters.
292 $machine_name = str_replace(':', '__', $this->migration->id());
293 $prefix_length = strlen($this->getDatabase()->tablePrefix());
294 $this->mapTableName = 'migrate_map_' . Unicode::strtolower($machine_name);
295 $this->mapTableName = Unicode::substr($this->mapTableName, 0, 63 - $prefix_length);
296 $this->messageTableName = 'migrate_message_' . Unicode::strtolower($machine_name);
297 $this->messageTableName = Unicode::substr($this->messageTableName, 0, 63 - $prefix_length);
298 $this->ensureTables();
305 public function setMessage(MigrateMessageInterface $message) {
306 $this->message = $message;
310 * Create the map and message tables if they don't already exist.
312 protected function ensureTables() {
313 if (!$this->getDatabase()->schema()->tableExists($this->mapTableName)) {
314 // Generate appropriate schema info for the map and message tables,
315 // and map from the source field names to the map/msg field names.
317 $source_id_schema = [];
319 foreach ($this->migration->getSourcePlugin()->getIds() as $id_definition) {
320 $mapkey = 'sourceid' . $count++;
321 $indexes['source'][] = $mapkey;
322 $source_id_schema[$mapkey] = $this->getFieldSchema($id_definition);
323 $source_id_schema[$mapkey]['not null'] = TRUE;
326 $source_ids_hash[static::SOURCE_IDS_HASH] = [
330 'description' => 'Hash of source ids. Used as primary key',
332 $fields = $source_ids_hash + $source_id_schema;
334 // Add destination identifiers to map table.
335 // @todo How do we discover the destination schema?
337 foreach ($this->migration->getDestinationPlugin()->getIds() as $id_definition) {
338 // Allow dest identifier fields to be NULL (for IGNORED/FAILED cases).
339 $mapkey = 'destid' . $count++;
340 $fields[$mapkey] = $this->getFieldSchema($id_definition);
341 $fields[$mapkey]['not null'] = FALSE;
343 $fields['source_row_status'] = [
348 'default' => MigrateIdMapInterface::STATUS_IMPORTED,
349 'description' => 'Indicates current status of the source row',
351 $fields['rollback_action'] = [
356 'default' => MigrateIdMapInterface::ROLLBACK_DELETE,
357 'description' => 'Flag indicating what to do for this item on rollback',
359 $fields['last_imported'] = [
364 'description' => 'UNIX timestamp of the last time this row was imported',
370 'description' => 'Hash of source row data, for detecting changes',
373 'description' => 'Mappings from source identifier value(s) to destination identifier value(s).',
375 'primary key' => [static::SOURCE_IDS_HASH],
376 'indexes' => $indexes,
378 $this->getDatabase()->schema()->createTable($this->mapTableName, $schema);
380 // Now do the message table.
381 if (!$this->getDatabase()->schema()->tableExists($this->messageTableName())) {
388 $fields += $source_ids_hash;
396 $fields['message'] = [
402 'description' => 'Messages generated during a migration process',
404 'primary key' => ['msgid'],
406 $this->getDatabase()->schema()->createTable($this->messageTableName(), $schema);
410 // Add any missing columns to the map table.
411 if (!$this->getDatabase()->schema()->fieldExists($this->mapTableName,
412 'rollback_action')) {
413 $this->getDatabase()->schema()->addField($this->mapTableName, 'rollback_action',
420 'description' => 'Flag indicating what to do for this item on rollback',
424 if (!$this->getDatabase()->schema()->fieldExists($this->mapTableName, 'hash')) {
425 $this->getDatabase()->schema()->addField($this->mapTableName, 'hash',
430 'description' => 'Hash of source row data, for detecting changes',
434 if (!$this->getDatabase()->schema()->fieldExists($this->mapTableName, static::SOURCE_IDS_HASH)) {
435 $this->getDatabase()->schema()->addField($this->mapTableName, static::SOURCE_IDS_HASH, [
439 'description' => 'Hash of source ids. Used as primary key',
446 * Creates schema from an ID definition.
448 * @param array $id_definition
449 * The definition of the field having the structure as the items returned by
450 * MigrateSourceInterface or MigrateDestinationInterface::getIds().
453 * The database schema definition.
455 * @see \Drupal\migrate\Plugin\MigrateSourceInterface::getIds()
456 * @see \Drupal\migrate\Plugin\MigrateDestinationInterface::getIds()
458 protected function getFieldSchema(array $id_definition) {
459 $type_parts = explode('.', $id_definition['type']);
460 if (count($type_parts) == 1) {
461 $type_parts[] = 'value';
463 unset($id_definition['type']);
465 // Get the field storage definition.
466 $definition = BaseFieldDefinition::create($type_parts[0]);
468 // Get a list of setting keys belonging strictly to the field definition.
469 $default_field_settings = $definition->getSettings();
470 // Separate field definition settings from custom settings. Custom settings
471 // are settings passed in $id_definition that are not part of field storage
472 // definition settings.
473 $field_settings = array_intersect_key($id_definition, $default_field_settings);
474 $custom_settings = array_diff_key($id_definition, $default_field_settings);
476 // Resolve schema from field storage definition settings.
477 $schema = $definition
478 ->setSettings($field_settings)
479 ->getColumns()[$type_parts[1]];
481 // Merge back custom settings.
482 return $schema + $custom_settings;
488 public function getRowBySource(array $source_id_values) {
489 $query = $this->getDatabase()->select($this->mapTableName(), 'map')
491 $query->condition(static::SOURCE_IDS_HASH, $this->getSourceIDsHash($source_id_values));
492 $result = $query->execute();
493 return $result->fetchAssoc();
499 public function getRowByDestination(array $destination_id_values) {
500 $query = $this->getDatabase()->select($this->mapTableName(), 'map')
502 foreach ($this->destinationIdFields() as $field_name => $destination_id) {
503 $query->condition("map.$destination_id", $destination_id_values[$field_name], '=');
505 $result = $query->execute();
506 return $result->fetchAssoc();
512 public function getRowsNeedingUpdate($count) {
514 $result = $this->getDatabase()->select($this->mapTableName(), 'map')
516 ->condition('source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE)
519 foreach ($result as $row) {
528 public function lookupSourceID(array $destination_id_values) {
529 $source_id_fields = $this->sourceIdFields();
530 $query = $this->getDatabase()->select($this->mapTableName(), 'map');
531 foreach ($source_id_fields as $source_field_name => $idmap_field_name) {
532 $query->addField('map', $idmap_field_name, $source_field_name);
534 foreach ($this->destinationIdFields() as $field_name => $destination_id) {
535 $query->condition("map.$destination_id", $destination_id_values[$field_name], '=');
537 $result = $query->execute();
538 return $result->fetchAssoc() ?: [];
544 public function lookupDestinationId(array $source_id_values) {
545 $results = $this->lookupDestinationIds($source_id_values);
546 return $results ? reset($results) : [];
552 public function lookupDestinationIds(array $source_id_values) {
553 if (empty($source_id_values)) {
557 // Canonicalize the keys into a hash of DB-field => value.
558 $is_associative = !isset($source_id_values[0]);
560 foreach ($this->sourceIdFields() as $field_name => $db_field) {
561 if ($is_associative) {
562 // Associative $source_id_values can have fields out of order.
563 if (isset($source_id_values[$field_name])) {
564 $conditions[$db_field] = $source_id_values[$field_name];
565 unset($source_id_values[$field_name]);
569 // For non-associative $source_id_values, we assume they're the first
571 if (empty($source_id_values)) {
574 $conditions[$db_field] = array_shift($source_id_values);
578 if (!empty($source_id_values)) {
579 throw new MigrateException("Extra unknown items in source IDs");
582 $query = $this->getDatabase()->select($this->mapTableName(), 'map')
583 ->fields('map', $this->destinationIdFields());
584 if (count($this->sourceIdFields()) === count($conditions)) {
585 // Optimization: Use the primary key.
586 $query->condition(self::SOURCE_IDS_HASH, $this->getSourceIDsHash(array_values($conditions)));
589 foreach ($conditions as $db_field => $value) {
590 $query->condition($db_field, $value);
594 return $query->execute()->fetchAll(\PDO::FETCH_NUM);
600 public function saveIdMapping(Row $row, array $destination_id_values, $source_row_status = MigrateIdMapInterface::STATUS_IMPORTED, $rollback_action = MigrateIdMapInterface::ROLLBACK_DELETE) {
601 // Construct the source key.
602 $source_id_values = $row->getSourceIdValues();
603 // Construct the source key and initialize to empty variable keys.
605 foreach ($this->sourceIdFields() as $field_name => $key_name) {
606 // A NULL key value is usually an indication of a problem.
607 if (!isset($source_id_values[$field_name])) {
608 $this->message->display($this->t(
609 'Did not save to map table due to NULL value for key field @field',
610 ['@field' => $field_name]), 'error');
613 $fields[$key_name] = $source_id_values[$field_name];
621 'source_row_status' => (int) $source_row_status,
622 'rollback_action' => (int) $rollback_action,
623 'hash' => $row->getHash(),
626 foreach ($destination_id_values as $dest_id) {
627 $fields['destid' . ++$count] = $dest_id;
629 if ($count && $count != count($this->destinationIdFields())) {
630 $this->message->display(t('Could not save to map table due to missing destination id values'), 'error');
633 if ($this->migration->getTrackLastImported()) {
634 $fields['last_imported'] = time();
636 $keys = [static::SOURCE_IDS_HASH => $this->getSourceIDsHash($source_id_values)];
637 // Notify anyone listening of the map row we're about to save.
638 $this->eventDispatcher->dispatch(MigrateEvents::MAP_SAVE, new MigrateMapSaveEvent($this, $fields));
639 $this->getDatabase()->merge($this->mapTableName())
648 public function saveMessage(array $source_id_values, $message, $level = MigrationInterface::MESSAGE_ERROR) {
649 foreach ($this->sourceIdFields() as $field_name => $source_id) {
650 // If any key value is not set, we can't save.
651 if (!isset($source_id_values[$field_name])) {
655 $fields[static::SOURCE_IDS_HASH] = $this->getSourceIDsHash($source_id_values);
656 $fields['level'] = $level;
657 $fields['message'] = $message;
658 $this->getDatabase()->insert($this->messageTableName())
662 // Notify anyone listening of the message we've saved.
663 $this->eventDispatcher->dispatch(MigrateEvents::IDMAP_MESSAGE,
664 new MigrateIdMapMessageEvent($this->migration, $source_id_values, $message, $level));
670 public function getMessageIterator(array $source_id_values = [], $level = NULL) {
671 $query = $this->getDatabase()->select($this->messageTableName(), 'msg')
673 if ($source_id_values) {
674 $query->condition(static::SOURCE_IDS_HASH, $this->getSourceIDsHash($source_id_values));
678 $query->condition('level', $level);
680 return $query->execute();
686 public function prepareUpdate() {
687 $this->getDatabase()->update($this->mapTableName())
688 ->fields(['source_row_status' => MigrateIdMapInterface::STATUS_NEEDS_UPDATE])
695 public function processedCount() {
696 return $this->getDatabase()->select($this->mapTableName())
705 public function importedCount() {
706 return $this->getDatabase()->select($this->mapTableName())
707 ->condition('source_row_status', [MigrateIdMapInterface::STATUS_IMPORTED, MigrateIdMapInterface::STATUS_NEEDS_UPDATE], 'IN')
716 public function updateCount() {
717 return $this->countHelper(MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
723 public function errorCount() {
724 return $this->countHelper(MigrateIdMapInterface::STATUS_FAILED);
730 public function messageCount() {
731 return $this->countHelper(NULL, $this->messageTableName());
735 * Counts records in a table.
738 * An integer for the source_row_status column.
739 * @param string $table
740 * (optional) The table to work. Defaults to NULL.
743 * The number of records.
745 protected function countHelper($status, $table = NULL) {
746 $query = $this->getDatabase()->select($table ?: $this->mapTableName());
747 if (isset($status)) {
748 $query->condition('source_row_status', $status);
750 return $query->countQuery()->execute()->fetchField();
756 public function delete(array $source_id_values, $messages_only = FALSE) {
757 if (empty($source_id_values)) {
758 throw new MigrateException('Without source identifier values it is impossible to find the row to delete.');
761 if (!$messages_only) {
762 $map_query = $this->getDatabase()->delete($this->mapTableName());
763 $map_query->condition(static::SOURCE_IDS_HASH, $this->getSourceIDsHash($source_id_values));
764 // Notify anyone listening of the map row we're about to delete.
765 $this->eventDispatcher->dispatch(MigrateEvents::MAP_DELETE, new MigrateMapDeleteEvent($this, $source_id_values));
766 $map_query->execute();
768 $message_query = $this->getDatabase()->delete($this->messageTableName());
769 $message_query->condition(static::SOURCE_IDS_HASH, $this->getSourceIDsHash($source_id_values));
770 $message_query->execute();
776 public function deleteDestination(array $destination_id_values) {
777 $map_query = $this->getDatabase()->delete($this->mapTableName());
778 $message_query = $this->getDatabase()->delete($this->messageTableName());
779 $source_id_values = $this->lookupSourceID($destination_id_values);
780 if (!empty($source_id_values)) {
781 foreach ($this->destinationIdFields() as $field_name => $destination_id) {
782 $map_query->condition($destination_id, $destination_id_values[$field_name]);
784 // Notify anyone listening of the map row we're about to delete.
785 $this->eventDispatcher->dispatch(MigrateEvents::MAP_DELETE, new MigrateMapDeleteEvent($this, $source_id_values));
786 $map_query->execute();
788 $message_query->condition(static::SOURCE_IDS_HASH, $this->getSourceIDsHash($source_id_values));
789 $message_query->execute();
796 public function setUpdate(array $source_id_values) {
797 if (empty($source_id_values)) {
798 throw new MigrateException('No source identifiers provided to update.');
800 $query = $this->getDatabase()
801 ->update($this->mapTableName())
802 ->fields(['source_row_status' => MigrateIdMapInterface::STATUS_NEEDS_UPDATE]);
804 foreach ($this->sourceIdFields() as $field_name => $source_id) {
805 $query->condition($source_id, $source_id_values[$field_name]);
813 public function clearMessages() {
814 $this->getDatabase()->truncate($this->messageTableName())->execute();
820 public function destroy() {
821 $this->getDatabase()->schema()->dropTable($this->mapTableName());
822 $this->getDatabase()->schema()->dropTable($this->messageTableName());
826 * Implementation of \Iterator::rewind().
828 * This is called before beginning a foreach loop.
830 public function rewind() {
831 $this->currentRow = NULL;
833 foreach ($this->sourceIdFields() as $field) {
836 foreach ($this->destinationIdFields() as $field) {
839 $this->result = $this->getDatabase()->select($this->mapTableName(), 'map')
840 ->fields('map', $fields)
847 * Implementation of \Iterator::current().
849 * This is called when entering a loop iteration, returning the current row.
851 public function current() {
852 return $this->currentRow;
856 * Implementation of \Iterator::key().
858 * This is called when entering a loop iteration, returning the key of the
859 * current row. It must be a scalar - we will serialize to fulfill the
860 * requirement, but using getCurrentKey() is preferable.
862 public function key() {
863 return serialize($this->currentKey);
869 public function currentDestination() {
870 if ($this->valid()) {
872 foreach ($this->destinationIdFields() as $destination_field_name => $idmap_field_name) {
873 if (!is_null($this->currentRow[$idmap_field_name])) {
874 $result[$destination_field_name] = $this->currentRow[$idmap_field_name];
887 public function currentSource() {
888 if ($this->valid()) {
890 foreach ($this->sourceIdFields() as $field_name => $source_id) {
891 $result[$field_name] = $this->currentKey[$source_id];
901 * Implementation of \Iterator::next().
903 * This is called at the bottom of the loop implicitly, as well as explicitly
906 public function next() {
907 $this->currentRow = $this->result->fetchAssoc();
908 $this->currentKey = [];
909 if ($this->currentRow) {
910 foreach ($this->sourceIdFields() as $map_field) {
911 $this->currentKey[$map_field] = $this->currentRow[$map_field];
912 // Leave only destination fields.
913 unset($this->currentRow[$map_field]);
919 * Implementation of \Iterator::valid().
921 * This is called at the top of the loop, returning TRUE to process the loop
922 * and FALSE to terminate it.
924 public function valid() {
925 return $this->currentRow !== FALSE;