namespace Drupal\migrate\Plugin\migrate\id_map;
-use Drupal\Component\Utility\Unicode;
+use Drupal\Core\Database\DatabaseException;
use Drupal\Core\Field\BaseFieldDefinition;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Plugin\PluginBase;
use Drupal\migrate\MigrateMessage;
+use Drupal\migrate\Audit\HighestIdInterface;
use Drupal\migrate\Plugin\MigrationInterface;
use Drupal\migrate\Event\MigrateIdMapMessageEvent;
use Drupal\migrate\MigrateException;
*
* @PluginID("sql")
*/
-class Sql extends PluginBase implements MigrateIdMapInterface, ContainerFactoryPluginInterface {
+class Sql extends PluginBase implements MigrateIdMapInterface, ContainerFactoryPluginInterface, HighestIdInterface {
/**
* Column name of hashed source id values.
* The configuration for the plugin.
* @param \Drupal\migrate\Plugin\MigrationInterface $migration
* The migration to do.
+ * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher
+ * The event dispatcher.
*/
public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, EventDispatcherInterface $event_dispatcher) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->migration = $migration;
$this->eventDispatcher = $event_dispatcher;
$this->message = new MigrateMessage();
+
+ if (!isset($this->database)) {
+ $this->database = \Drupal::database();
+ }
+
+ // Default generated table names, limited to 63 characters.
+ $machine_name = str_replace(':', '__', $this->migration->id());
+ $prefix_length = strlen($this->database->tablePrefix());
+ $this->mapTableName = 'migrate_map_' . mb_strtolower($machine_name);
+ $this->mapTableName = mb_substr($this->mapTableName, 0, 63 - $prefix_length);
+ $this->messageTableName = 'migrate_message_' . mb_strtolower($machine_name);
+ $this->messageTableName = mb_substr($this->messageTableName, 0, 63 - $prefix_length);
}
/**
* @return string
* An hash containing the hashed values of the source identifiers.
*/
- public function getSourceIDsHash(array $source_id_values) {
+ public function getSourceIdsHash(array $source_id_values) {
// When looking up the destination ID we require an array with both the
// source key and value, e.g. ['nid' => 41]. In this case, $source_id_values
// need to be ordered the same order as $this->sourceIdFields().
* The map table name.
*/
public function mapTableName() {
- $this->init();
return $this->mapTableName;
}
* The message table name.
*/
public function messageTableName() {
- $this->init();
return $this->messageTableName;
}
* The database connection object.
*/
public function getDatabase() {
- if (!isset($this->database)) {
- $this->database = \Drupal::database();
- }
$this->init();
return $this->database;
}
protected function init() {
if (!$this->initialized) {
$this->initialized = TRUE;
- // Default generated table names, limited to 63 characters.
- $machine_name = str_replace(':', '__', $this->migration->id());
- $prefix_length = strlen($this->getDatabase()->tablePrefix());
- $this->mapTableName = 'migrate_map_' . Unicode::strtolower($machine_name);
- $this->mapTableName = Unicode::substr($this->mapTableName, 0, 63 - $prefix_length);
- $this->messageTableName = 'migrate_message_' . Unicode::strtolower($machine_name);
- $this->messageTableName = Unicode::substr($this->messageTableName, 0, 63 - $prefix_length);
$this->ensureTables();
}
}
public function getRowBySource(array $source_id_values) {
$query = $this->getDatabase()->select($this->mapTableName(), 'map')
->fields('map');
- $query->condition(static::SOURCE_IDS_HASH, $this->getSourceIDsHash($source_id_values));
+ $query->condition(static::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
$result = $query->execute();
return $result->fetchAssoc();
}
/**
* {@inheritdoc}
*/
- public function lookupSourceID(array $destination_id_values) {
+ public function lookupSourceId(array $destination_id_values) {
$source_id_fields = $this->sourceIdFields();
$query = $this->getDatabase()->select($this->mapTableName(), 'map');
foreach ($source_id_fields as $source_field_name => $idmap_field_name) {
$conditions = [];
foreach ($this->sourceIdFields() as $field_name => $db_field) {
if ($is_associative) {
- // Associative $source_id_values can have fields out of order.
- if (isset($source_id_values[$field_name])) {
- $conditions[$db_field] = $source_id_values[$field_name];
+ // Ensure to handle array elements with a NULL value.
+ if (array_key_exists($field_name, $source_id_values)) {
+ // Associative $source_id_values can have fields out of order.
+ if (isset($source_id_values[$field_name])) {
+ // Only add a condition if the value is not NULL.
+ $conditions[$db_field] = $source_id_values[$field_name];
+ }
unset($source_id_values[$field_name]);
}
}
}
if (!empty($source_id_values)) {
- throw new MigrateException("Extra unknown items in source IDs");
+ $var_dump = var_export($source_id_values, TRUE);
+ throw new MigrateException(sprintf("Extra unknown items in source IDs: %s", $var_dump));
}
$query = $this->getDatabase()->select($this->mapTableName(), 'map')
->fields('map', $this->destinationIdFields());
if (count($this->sourceIdFields()) === count($conditions)) {
// Optimization: Use the primary key.
- $query->condition(self::SOURCE_IDS_HASH, $this->getSourceIDsHash(array_values($conditions)));
+ $query->condition(self::SOURCE_IDS_HASH, $this->getSourceIdsHash(array_values($conditions)));
}
else {
foreach ($conditions as $db_field => $value) {
if ($this->migration->getTrackLastImported()) {
$fields['last_imported'] = time();
}
- $keys = [static::SOURCE_IDS_HASH => $this->getSourceIDsHash($source_id_values)];
+ $keys = [static::SOURCE_IDS_HASH => $this->getSourceIdsHash($source_id_values)];
// Notify anyone listening of the map row we're about to save.
$this->eventDispatcher->dispatch(MigrateEvents::MAP_SAVE, new MigrateMapSaveEvent($this, $fields));
$this->getDatabase()->merge($this->mapTableName())
return;
}
}
- $fields[static::SOURCE_IDS_HASH] = $this->getSourceIDsHash($source_id_values);
+ $fields[static::SOURCE_IDS_HASH] = $this->getSourceIdsHash($source_id_values);
$fields['level'] = $level;
$fields['message'] = $message;
$this->getDatabase()->insert($this->messageTableName())
$query = $this->getDatabase()->select($this->messageTableName(), 'msg')
->fields('msg');
if ($source_id_values) {
- $query->condition(static::SOURCE_IDS_HASH, $this->getSourceIDsHash($source_id_values));
+ $query->condition(static::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
}
if ($level) {
* {@inheritdoc}
*/
public function processedCount() {
- return $this->getDatabase()->select($this->mapTableName())
- ->countQuery()
- ->execute()
- ->fetchField();
+ return $this->countHelper(NULL, $this->mapTableName());
}
/**
* {@inheritdoc}
*/
public function importedCount() {
- return $this->getDatabase()->select($this->mapTableName())
- ->condition('source_row_status', [MigrateIdMapInterface::STATUS_IMPORTED, MigrateIdMapInterface::STATUS_NEEDS_UPDATE], 'IN')
- ->countQuery()
- ->execute()
- ->fetchField();
+ return $this->countHelper([
+ MigrateIdMapInterface::STATUS_IMPORTED,
+ MigrateIdMapInterface::STATUS_NEEDS_UPDATE,
+ ]);
}
/**
/**
* Counts records in a table.
*
- * @param int $status
- * An integer for the source_row_status column.
+ * @param int|array $status
+ * (optional) Status code(s) to filter the source_row_status column.
* @param string $table
* (optional) The table to work. Defaults to NULL.
*
* @return int
* The number of records.
*/
- protected function countHelper($status, $table = NULL) {
- $query = $this->getDatabase()->select($table ?: $this->mapTableName());
+ protected function countHelper($status = NULL, $table = NULL) {
+ // Use database directly to avoid creating tables.
+ $query = $this->database->select($table ?: $this->mapTableName());
if (isset($status)) {
- $query->condition('source_row_status', $status);
+ $query->condition('source_row_status', $status, is_array($status) ? 'IN' : '=');
}
- return $query->countQuery()->execute()->fetchField();
+ try {
+ $count = (int) $query->countQuery()->execute()->fetchField();
+ }
+ catch (DatabaseException $e) {
+ // The table does not exist, therefore there are no records.
+ $count = 0;
+ }
+ return $count;
}
/**
if (!$messages_only) {
$map_query = $this->getDatabase()->delete($this->mapTableName());
- $map_query->condition(static::SOURCE_IDS_HASH, $this->getSourceIDsHash($source_id_values));
+ $map_query->condition(static::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
// Notify anyone listening of the map row we're about to delete.
$this->eventDispatcher->dispatch(MigrateEvents::MAP_DELETE, new MigrateMapDeleteEvent($this, $source_id_values));
$map_query->execute();
}
$message_query = $this->getDatabase()->delete($this->messageTableName());
- $message_query->condition(static::SOURCE_IDS_HASH, $this->getSourceIDsHash($source_id_values));
+ $message_query->condition(static::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
$message_query->execute();
}
public function deleteDestination(array $destination_id_values) {
$map_query = $this->getDatabase()->delete($this->mapTableName());
$message_query = $this->getDatabase()->delete($this->messageTableName());
- $source_id_values = $this->lookupSourceID($destination_id_values);
+ $source_id_values = $this->lookupSourceId($destination_id_values);
if (!empty($source_id_values)) {
foreach ($this->destinationIdFields() as $field_name => $destination_id) {
$map_query->condition($destination_id, $destination_id_values[$field_name]);
$this->eventDispatcher->dispatch(MigrateEvents::MAP_DELETE, new MigrateMapDeleteEvent($this, $source_id_values));
$map_query->execute();
- $message_query->condition(static::SOURCE_IDS_HASH, $this->getSourceIDsHash($source_id_values));
+ $message_query->condition(static::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
$message_query->execute();
}
}
return $this->currentRow !== FALSE;
}
+ /**
+ * Returns the migration plugin manager.
+ *
+ * @todo Inject as a dependency in https://www.drupal.org/node/2919158.
+ *
+ * @return \Drupal\migrate\Plugin\MigrationPluginManagerInterface
+ * The migration plugin manager.
+ */
+ protected function getMigrationPluginManager() {
+ return \Drupal::service('plugin.manager.migration');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getHighestId() {
+ array_filter(
+ $this->migration->getDestinationPlugin()->getIds(),
+ function (array $id) {
+ if ($id['type'] !== 'integer') {
+ throw new \LogicException('Cannot determine the highest migrated ID without an integer ID column');
+ }
+ }
+ );
+
+ // List of mapping tables to look in for the highest ID.
+ $map_tables = [
+ $this->migration->id() => $this->mapTableName(),
+ ];
+
+ // If there's a bundle, it means we have a derived migration and we need to
+ // find all the mapping tables from the related derived migrations.
+ if ($base_id = substr($this->migration->id(), 0, strpos($this->migration->id(), static::DERIVATIVE_SEPARATOR))) {
+ $migration_manager = $this->getMigrationPluginManager();
+ $migrations = $migration_manager->getDefinitions();
+ foreach ($migrations as $migration_id => $migration) {
+ if ($migration['id'] === $base_id) {
+ // Get this derived migration's mapping table and add it to the list
+ // of mapping tables to look in for the highest ID.
+ $stub = $migration_manager->createInstance($migration_id);
+ $map_tables[$migration_id] = $stub->getIdMap()->mapTableName();
+ }
+ }
+ }
+
+ // Get the highest id from the list of map tables.
+ $ids = [0];
+ foreach ($map_tables as $map_table) {
+ // If the map_table does not exist then continue on to the next map_table.
+ if (!$this->getDatabase()->schema()->tableExists($map_table)) {
+ continue;
+ }
+
+ $query = $this->getDatabase()->select($map_table, 'map')
+ ->fields('map', $this->destinationIdFields())
+ ->range(0, 1);
+ foreach (array_values($this->destinationIdFields()) as $order_field) {
+ $query->orderBy($order_field, 'DESC');
+ }
+ $ids[] = $query->execute()->fetchField();
+ }
+
+ // Return the highest of all the mapped IDs.
+ return (int) max($ids);
+ }
+
}