3 namespace Drupal\migrate\Plugin\migrate\source;
5 use Drupal\Core\Database\ConnectionNotDefinedException;
6 use Drupal\Core\Database\Database;
7 use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
8 use Drupal\Core\State\StateInterface;
9 use Drupal\migrate\Exception\RequirementsException;
10 use Drupal\migrate\MigrateException;
11 use Drupal\migrate\Plugin\MigrationInterface;
12 use Drupal\migrate\Plugin\migrate\id_map\Sql;
13 use Drupal\migrate\Plugin\MigrateIdMapInterface;
14 use Drupal\migrate\Plugin\RequirementsInterface;
15 use Symfony\Component\DependencyInjection\ContainerInterface;
18 * Sources whose data may be fetched via DBTNG.
20 * By default, an existing database connection with key 'migrate' and target
21 * 'default' is used. These may be overridden with explicit 'key' and/or
22 * 'target' configuration keys. In addition, if the configuration key 'database'
23 * is present, it is used as a database connection information array to define
26 abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPluginInterface, RequirementsInterface {
31 * @var \Drupal\Core\Database\Query\SelectInterface
36 * The database object.
38 * @var \Drupal\Core\Database\Connection
43 * State service for retrieving database info.
45 * @var \Drupal\Core\State\StateInterface
50 * The count of the number of batches run.
57 * Number of records to fetch from the database during each batch.
59 * A value of zero indicates no batching is to be done.
63 protected $batchSize = 0;
68 public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, StateInterface $state) {
69 parent::__construct($configuration, $plugin_id, $plugin_definition, $migration);
70 $this->state = $state;
76 public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
82 $container->get('state')
87 * Prints the query string when the object is used as a string.
92 public function __toString() {
93 return (string) $this->query();
97 * Gets the database connection object.
99 * @return \Drupal\Core\Database\Connection
100 * The database connection.
102 public function getDatabase() {
103 if (!isset($this->database)) {
104 // See if the database info is in state - if not, fallback to
106 if (isset($this->configuration['database_state_key'])) {
107 $this->database = $this->setUpDatabase($this->state->get($this->configuration['database_state_key']));
109 elseif (($fallback_state_key = $this->state->get('migrate.fallback_state_key'))) {
110 $this->database = $this->setUpDatabase($this->state->get($fallback_state_key));
113 $this->database = $this->setUpDatabase($this->configuration);
116 return $this->database;
120 * Gets a connection to the referenced database.
122 * This method will add the database connection if necessary.
124 * @param array $database_info
125 * Configuration for the source database connection. The keys are:
126 * 'key' - The database connection key.
127 * 'target' - The database connection target.
128 * 'database' - Database configuration array as accepted by
129 * Database::addConnectionInfo.
131 * @return \Drupal\Core\Database\Connection
132 * The connection to use for this plugin's queries.
134 * @throws \Drupal\migrate\Exception\RequirementsException
135 * Thrown if no source database connection is configured.
137 protected function setUpDatabase(array $database_info) {
138 if (isset($database_info['key'])) {
139 $key = $database_info['key'];
142 // If there is no explicit database configuration at all, fall back to a
143 // connection named 'migrate'.
146 if (isset($database_info['target'])) {
147 $target = $database_info['target'];
152 if (isset($database_info['database'])) {
153 Database::addConnectionInfo($key, $target, $database_info['database']);
156 $connection = Database::getConnection($target, $key);
158 catch (ConnectionNotDefinedException $e) {
159 // If we fell back to the magic 'migrate' connection and it doesn't exist,
160 // treat the lack of the connection as a RequirementsException.
161 if ($key == 'migrate') {
162 throw new RequirementsException("No database connection configured for source plugin " . $this->pluginId, [], 0, $e);
174 public function checkRequirements() {
175 if ($this->pluginDefinition['requirements_met'] === TRUE) {
176 $this->getDatabase();
181 * Wrapper for database select.
183 protected function select($table, $alias = NULL, array $options = []) {
184 $options['fetch'] = \PDO::FETCH_ASSOC;
185 return $this->getDatabase()->select($table, $alias, $options);
189 * Adds tags and metadata to the query.
191 * @return \Drupal\Core\Database\Query\SelectInterface
192 * The query with additional tags and metadata.
194 protected function prepareQuery() {
195 $this->query = clone $this->query();
196 $this->query->addTag('migrate');
197 $this->query->addTag('migrate_' . $this->migration->id());
198 $this->query->addMetaData('migration', $this->migration);
206 protected function initializeIterator() {
207 // Initialize the batch size.
208 if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) {
209 // Valid batch sizes are integers >= 0.
210 if (is_int($this->configuration['batch_size']) && ($this->configuration['batch_size']) >= 0) {
211 $this->batchSize = $this->configuration['batch_size'];
214 throw new MigrateException("batch_size must be greater than or equal to zero");
218 // If a batch has run the query is already setup.
219 if ($this->batch == 0) {
220 $this->prepareQuery();
222 // Get the key values, for potential use in joining to the map table.
225 // The rules for determining what conditions to add to the query are as
226 // follows (applying first applicable rule):
227 // 1. If the map is joinable, join it. We will want to accept all rows
228 // which are either not in the map, or marked in the map as NEEDS_UPDATE.
229 // Note that if high water fields are in play, we want to accept all rows
230 // above the high water mark in addition to those selected by the map
231 // conditions, so we need to OR them together (but AND with any existing
232 // conditions in the query). So, ultimately the SQL condition will look
233 // like (original conditions) AND (map IS NULL OR map needs update
234 // OR above high water).
235 $conditions = $this->query->orConditionGroup();
236 $condition_added = FALSE;
238 if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) {
239 // Build the join to the map table. Because the source key could have
240 // multiple fields, we need to build things up.
244 foreach ($this->getIds() as $field_name => $field_schema) {
245 if (isset($field_schema['alias'])) {
246 $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name);
248 $map_join .= "$delimiter$field_name = map.sourceid" . $count++;
249 $delimiter = ' AND ';
252 $alias = $this->query->leftJoin($this->migration->getIdMap()
253 ->getQualifiedMapTableName(), 'map', $map_join);
254 $conditions->isNull($alias . '.sourceid1');
255 $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
256 $condition_added = TRUE;
258 // And as long as we have the map table, add its data to the row.
259 $n = count($this->getIds());
260 for ($count = 1; $count <= $n; $count++) {
261 $map_key = 'sourceid' . $count;
262 $this->query->addField($alias, $map_key, "migrate_map_$map_key");
263 $added_fields[] = "$alias.$map_key";
265 if ($n = count($this->migration->getDestinationIds())) {
266 for ($count = 1; $count <= $n; $count++) {
267 $map_key = 'destid' . $count++;
268 $this->query->addField($alias, $map_key, "migrate_map_$map_key");
269 $added_fields[] = "$alias.$map_key";
272 $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
273 $added_fields[] = "$alias.source_row_status";
275 // 2. If we are using high water marks, also include rows above the mark.
276 // But, include all rows if the high water mark is not set.
277 if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater())) {
278 $high_water_field = $this->getHighWaterField();
279 $conditions->condition($high_water_field, $high_water, '>');
280 $this->query->orderBy($high_water_field);
281 $condition_added = TRUE;
283 if ($condition_added) {
284 $this->query->condition($conditions);
286 // If the query has a group by, our added fields need it too, to keep the
288 // @see https://dev.mysql.com/doc/refman/5.7/en/group-by-handling.html
289 $group_by = $this->query->getGroupBy();
290 if ($group_by && $added_fields) {
291 foreach ($added_fields as $added_field) {
292 $this->query->groupBy($added_field);
297 // Download data in batches for performance.
298 if (($this->batchSize > 0)) {
299 $this->query->range($this->batch * $this->batchSize, $this->batchSize);
301 return new \IteratorIterator($this->query->execute());
305 * Position the iterator to the following row.
307 protected function fetchNextRow() {
308 $this->getIterator()->next();
309 // We might be out of data entirely, or just out of data in the current
310 // batch. Attempt to fetch the next batch and see.
311 if ($this->batchSize > 0 && !$this->getIterator()->valid()) {
312 $this->fetchNextBatch();
317 * Prepares query for the next set of data from the source database.
319 protected function fetchNextBatch() {
321 unset($this->iterator);
322 $this->getIterator()->rewind();
326 * @return \Drupal\Core\Database\Query\SelectInterface
328 abstract public function query();
333 public function count() {
334 return $this->query()->countQuery()->execute()->fetchField();
338 * Checks if we can join against the map table.
340 * This function specifically catches issues when we're migrating with
341 * unique sets of credentials for the source and destination database.
344 * TRUE if we can join against the map table otherwise FALSE.
346 protected function mapJoinable() {
347 if (!$this->getIds()) {
350 // With batching, we want a later batch to return the same rows that would
351 // have been returned at the same point within a monolithic query. If we
352 // join to the map table, the first batch is writing to the map table and
353 // thus affecting the results of subsequent batches. To be safe, we avoid
354 // joining to the map table when batching.
355 if ($this->batchSize > 0) {
358 $id_map = $this->migration->getIdMap();
359 if (!$id_map instanceof Sql) {
362 $id_map_database_options = $id_map->getDatabase()->getConnectionOptions();
363 $source_database_options = $this->getDatabase()->getConnectionOptions();
365 // Special handling for sqlite which deals with files.
366 if ($id_map_database_options['driver'] === 'sqlite' &&
367 $source_database_options['driver'] === 'sqlite' &&
368 $id_map_database_options['database'] != $source_database_options['database']
373 // FALSE if driver is PostgreSQL and database doesn't match.
374 if ($id_map_database_options['driver'] === 'pgsql' &&
375 $source_database_options['driver'] === 'pgsql' &&
376 $id_map_database_options['database'] != $source_database_options['database']
381 foreach (['username', 'password', 'host', 'port', 'namespace', 'driver'] as $key) {
382 if (isset($source_database_options[$key])) {
383 if ($id_map_database_options[$key] != $source_database_options[$key]) {