3 namespace Drupal\migrate\Plugin\migrate\source;
5 use Drupal\Core\Database\ConnectionNotDefinedException;
6 use Drupal\Core\Database\Database;
7 use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
8 use Drupal\Core\State\StateInterface;
9 use Drupal\migrate\Exception\RequirementsException;
10 use Drupal\migrate\MigrateException;
11 use Drupal\migrate\Plugin\MigrationInterface;
12 use Drupal\migrate\Plugin\migrate\id_map\Sql;
13 use Drupal\migrate\Plugin\MigrateIdMapInterface;
14 use Drupal\migrate\Plugin\RequirementsInterface;
15 use Symfony\Component\DependencyInjection\ContainerInterface;
18 * Sources whose data may be fetched via a database connection.
20 * Available configuration keys:
21 * - database_state_key: (optional) Name of the state key which contains an
22 * array with database connection information.
23 * - key: (optional) The database key name. Defaults to 'migrate'.
24 * - target: (optional) The database target name. Defaults to 'default'.
25 * - batch_size: (optional) Number of records to fetch from the database during
26 * each batch. If omitted, all records are fetched in a single query.
27 * - ignore_map: (optional) Source data is joined to the map table by default to
28 * improve migration performance. If set to TRUE, the map table will not be
29 * joined. Using expressions in the query may result in column aliases in the
30 * JOIN clause which would be invalid SQL. If you run into this, set
33 * For other optional configuration keys inherited from the parent class, refer
34 * to \Drupal\migrate\Plugin\migrate\source\SourcePluginBase.
36 * About the source database determination:
37 * - If the source plugin configuration contains 'database_state_key', its value
38 * is taken as the name of a state key which contains an array with the
39 * database configuration.
40 * - Otherwise, if the source plugin configuration contains 'key', the database
41 * configuration with that name is used.
42 * - If both 'database_state_key' and 'key' are omitted in the source plugin
43 * configuration, the database connection named 'migrate' is used by default.
44 * - If all of the above steps fail, RequirementsException is thrown.
46 * Drupal Database API supports multiple database connections. The connection
47 * parameters are defined in $databases array in settings.php or
48 * settings.local.php. It is also possible to modify the $databases array in
49 * runtime. For example, Migrate Drupal, which provides the migrations from
50 * Drupal 6 / 7, asks for the source database connection parameters in the UI
51 * and then adds the $databases['migrate'] connection in runtime before the
52 * migrations are executed.
54 * As described above, the default source database is $databases['migrate']. If
55 * the source plugin needs another source connection, the database connection
56 * parameters should be added to the $databases array as, for instance,
57 * $databases['foo']. The source plugin can then use this connection by setting
58 * 'key' to 'foo' in its configuration.
60 * For a complete example on migrating data from an SQL source, refer to
61 * https://www.drupal.org/docs/8/api/migrate-api/migrating-data-from-sql-source
63 * @see https://www.drupal.org/docs/8/api/database-api
64 * @see \Drupal\migrate_drupal\Plugin\migrate\source\DrupalSqlBase
66 abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPluginInterface, RequirementsInterface {
71 * @var \Drupal\Core\Database\Query\SelectInterface
76 * The database object.
78 * @var \Drupal\Core\Database\Connection
83 * State service for retrieving database info.
85 * @var \Drupal\Core\State\StateInterface
90 * The count of the number of batches run.
97 * Number of records to fetch from the database during each batch.
99 * A value of zero indicates no batching is to be done.
103 protected $batchSize = 0;
108 public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, StateInterface $state) {
109 parent::__construct($configuration, $plugin_id, $plugin_definition, $migration);
110 $this->state = $state;
111 // If we are using high water, but haven't yet set a high water mark, skip
112 // joining the map table, as we want to get all available records.
113 if ($this->getHighWaterProperty() && $this->getHighWater() === NULL) {
114 $this->configuration['ignore_map'] = TRUE;
121 public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
127 $container->get('state')
132 * Prints the query string when the object is used as a string.
137 public function __toString() {
138 return (string) $this->query();
142 * Gets the database connection object.
144 * @return \Drupal\Core\Database\Connection
145 * The database connection.
147 public function getDatabase() {
148 if (!isset($this->database)) {
149 // Look first for an explicit state key containing the configuration.
150 if (isset($this->configuration['database_state_key'])) {
151 $this->database = $this->setUpDatabase($this->state->get($this->configuration['database_state_key']));
153 // Next, use explicit configuration in the source plugin.
154 elseif (isset($this->configuration['key'])) {
155 $this->database = $this->setUpDatabase($this->configuration);
157 // Next, try falling back to the global state key.
158 elseif (($fallback_state_key = $this->state->get('migrate.fallback_state_key'))) {
159 $this->database = $this->setUpDatabase($this->state->get($fallback_state_key));
161 // If all else fails, let setUpDatabase() fallback to the 'migrate' key.
163 $this->database = $this->setUpDatabase([]);
166 return $this->database;
170 * Gets a connection to the referenced database.
172 * This method will add the database connection if necessary.
174 * @param array $database_info
175 * Configuration for the source database connection. The keys are:
176 * 'key' - The database connection key.
177 * 'target' - The database connection target.
178 * 'database' - Database configuration array as accepted by
179 * Database::addConnectionInfo.
181 * @return \Drupal\Core\Database\Connection
182 * The connection to use for this plugin's queries.
184 * @throws \Drupal\migrate\Exception\RequirementsException
185 * Thrown if no source database connection is configured.
187 protected function setUpDatabase(array $database_info) {
188 if (isset($database_info['key'])) {
189 $key = $database_info['key'];
192 // If there is no explicit database configuration at all, fall back to a
193 // connection named 'migrate'.
196 if (isset($database_info['target'])) {
197 $target = $database_info['target'];
202 if (isset($database_info['database'])) {
203 Database::addConnectionInfo($key, $target, $database_info['database']);
206 $connection = Database::getConnection($target, $key);
208 catch (ConnectionNotDefinedException $e) {
209 // If we fell back to the magic 'migrate' connection and it doesn't exist,
210 // treat the lack of the connection as a RequirementsException.
211 if ($key == 'migrate') {
212 throw new RequirementsException("No database connection configured for source plugin " . $this->pluginId, [], 0, $e);
224 public function checkRequirements() {
225 if ($this->pluginDefinition['requirements_met'] === TRUE) {
226 $this->getDatabase();
231 * Wrapper for database select.
233 protected function select($table, $alias = NULL, array $options = []) {
234 $options['fetch'] = \PDO::FETCH_ASSOC;
235 return $this->getDatabase()->select($table, $alias, $options);
239 * Adds tags and metadata to the query.
241 * @return \Drupal\Core\Database\Query\SelectInterface
242 * The query with additional tags and metadata.
244 protected function prepareQuery() {
245 $this->query = clone $this->query();
246 $this->query->addTag('migrate');
247 $this->query->addTag('migrate_' . $this->migration->id());
248 $this->query->addMetaData('migration', $this->migration);
256 protected function initializeIterator() {
257 // Initialize the batch size.
258 if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) {
259 // Valid batch sizes are integers >= 0.
260 if (is_int($this->configuration['batch_size']) && ($this->configuration['batch_size']) >= 0) {
261 $this->batchSize = $this->configuration['batch_size'];
264 throw new MigrateException("batch_size must be greater than or equal to zero");
268 // If a batch has run the query is already setup.
269 if ($this->batch == 0) {
270 $this->prepareQuery();
272 // Get the key values, for potential use in joining to the map table.
275 // The rules for determining what conditions to add to the query are as
276 // follows (applying first applicable rule):
277 // 1. If the map is joinable, join it. We will want to accept all rows
278 // which are either not in the map, or marked in the map as NEEDS_UPDATE.
279 // Note that if high water fields are in play, we want to accept all rows
280 // above the high water mark in addition to those selected by the map
281 // conditions, so we need to OR them together (but AND with any existing
282 // conditions in the query). So, ultimately the SQL condition will look
283 // like (original conditions) AND (map IS NULL OR map needs update
284 // OR above high water).
285 $conditions = $this->query->orConditionGroup();
286 $condition_added = FALSE;
288 if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) {
289 // Build the join to the map table. Because the source key could have
290 // multiple fields, we need to build things up.
294 foreach ($this->getIds() as $field_name => $field_schema) {
295 if (isset($field_schema['alias'])) {
296 $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name);
298 $map_join .= "$delimiter$field_name = map.sourceid" . $count++;
299 $delimiter = ' AND ';
302 $alias = $this->query->leftJoin($this->migration->getIdMap()
303 ->getQualifiedMapTableName(), 'map', $map_join);
304 $conditions->isNull($alias . '.sourceid1');
305 $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
306 $condition_added = TRUE;
308 // And as long as we have the map table, add its data to the row.
309 $n = count($this->getIds());
310 for ($count = 1; $count <= $n; $count++) {
311 $map_key = 'sourceid' . $count;
312 $this->query->addField($alias, $map_key, "migrate_map_$map_key");
313 $added_fields[] = "$alias.$map_key";
315 if ($n = count($this->migration->getDestinationIds())) {
316 for ($count = 1; $count <= $n; $count++) {
317 $map_key = 'destid' . $count++;
318 $this->query->addField($alias, $map_key, "migrate_map_$map_key");
319 $added_fields[] = "$alias.$map_key";
322 $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
323 $added_fields[] = "$alias.source_row_status";
325 // 2. If we are using high water marks, also include rows above the mark.
326 // But, include all rows if the high water mark is not set.
327 if ($this->getHighWaterProperty()) {
328 $high_water_field = $this->getHighWaterField();
329 $high_water = $this->getHighWater();
330 // We check against NULL because 0 is an acceptable value for the high
332 if ($high_water !== NULL) {
333 $conditions->condition($high_water_field, $high_water, '>');
334 $condition_added = TRUE;
336 // Always sort by the high water field, to ensure that the first run
337 // (before we have a high water value) also has the results in a
339 $this->query->orderBy($high_water_field);
341 if ($condition_added) {
342 $this->query->condition($conditions);
344 // If the query has a group by, our added fields need it too, to keep the
346 // @see https://dev.mysql.com/doc/refman/5.7/en/group-by-handling.html
347 $group_by = $this->query->getGroupBy();
348 if ($group_by && $added_fields) {
349 foreach ($added_fields as $added_field) {
350 $this->query->groupBy($added_field);
355 // Download data in batches for performance.
356 if (($this->batchSize > 0)) {
357 $this->query->range($this->batch * $this->batchSize, $this->batchSize);
359 $statement = $this->query->execute();
360 $statement->setFetchMode(\PDO::FETCH_ASSOC);
361 return new \IteratorIterator($statement);
365 * Position the iterator to the following row.
367 protected function fetchNextRow() {
368 $this->getIterator()->next();
369 // We might be out of data entirely, or just out of data in the current
370 // batch. Attempt to fetch the next batch and see.
371 if ($this->batchSize > 0 && !$this->getIterator()->valid()) {
372 $this->fetchNextBatch();
377 * Prepares query for the next set of data from the source database.
379 protected function fetchNextBatch() {
381 unset($this->iterator);
382 $this->getIterator()->rewind();
386 * @return \Drupal\Core\Database\Query\SelectInterface
388 abstract public function query();
393 public function count($refresh = FALSE) {
394 return (int) $this->query()->countQuery()->execute()->fetchField();
398 * Checks if we can join against the map table.
400 * This function specifically catches issues when we're migrating with
401 * unique sets of credentials for the source and destination database.
404 * TRUE if we can join against the map table otherwise FALSE.
406 protected function mapJoinable() {
407 if (!$this->getIds()) {
410 // With batching, we want a later batch to return the same rows that would
411 // have been returned at the same point within a monolithic query. If we
412 // join to the map table, the first batch is writing to the map table and
413 // thus affecting the results of subsequent batches. To be safe, we avoid
414 // joining to the map table when batching.
415 if ($this->batchSize > 0) {
418 $id_map = $this->migration->getIdMap();
419 if (!$id_map instanceof Sql) {
422 $id_map_database_options = $id_map->getDatabase()->getConnectionOptions();
423 $source_database_options = $this->getDatabase()->getConnectionOptions();
425 // Special handling for sqlite which deals with files.
426 if ($id_map_database_options['driver'] === 'sqlite' &&
427 $source_database_options['driver'] === 'sqlite' &&
428 $id_map_database_options['database'] != $source_database_options['database']
433 // FALSE if driver is PostgreSQL and database doesn't match.
434 if ($id_map_database_options['driver'] === 'pgsql' &&
435 $source_database_options['driver'] === 'pgsql' &&
436 $id_map_database_options['database'] != $source_database_options['database']
441 foreach (['username', 'password', 'host', 'port', 'namespace', 'driver'] as $key) {
442 if (isset($source_database_options[$key])) {
443 if ($id_map_database_options[$key] != $source_database_options[$key]) {