3c6d4c49a38facf7a45d7d7998bc9a74851b33ad
[yaffs-website] / web / core / modules / migrate / src / Plugin / migrate / source / SqlBase.php
1 <?php
2
3 namespace Drupal\migrate\Plugin\migrate\source;
4
5 use Drupal\Core\Database\ConnectionNotDefinedException;
6 use Drupal\Core\Database\Database;
7 use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
8 use Drupal\Core\State\StateInterface;
9 use Drupal\migrate\Exception\RequirementsException;
10 use Drupal\migrate\MigrateException;
11 use Drupal\migrate\Plugin\MigrationInterface;
12 use Drupal\migrate\Plugin\migrate\id_map\Sql;
13 use Drupal\migrate\Plugin\MigrateIdMapInterface;
14 use Drupal\migrate\Plugin\RequirementsInterface;
15 use Symfony\Component\DependencyInjection\ContainerInterface;
16
17 /**
18  * Sources whose data may be fetched via DBTNG.
19  *
20  * By default, an existing database connection with key 'migrate' and target
21  * 'default' is used. These may be overridden with explicit 'key' and/or
22  * 'target' configuration keys. In addition, if the configuration key 'database'
23  * is present, it is used as a database connection information array to define
24  * the connection.
25  */
26 abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPluginInterface, RequirementsInterface {
27
28   /**
29    * The query string.
30    *
31    * @var \Drupal\Core\Database\Query\SelectInterface
32    */
33   protected $query;
34
35   /**
36    * The database object.
37    *
38    * @var \Drupal\Core\Database\Connection
39    */
40   protected $database;
41
42   /**
43    * State service for retrieving database info.
44    *
45    * @var \Drupal\Core\State\StateInterface
46    */
47   protected $state;
48
49   /**
50    * The count of the number of batches run.
51    *
52    * @var int
53    */
54   protected $batch = 0;
55
56   /**
57    * Number of records to fetch from the database during each batch.
58    *
59    * A value of zero indicates no batching is to be done.
60    *
61    * @var int
62    */
63   protected $batchSize = 0;
64
65   /**
66    * {@inheritdoc}
67    */
68   public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, StateInterface $state) {
69     parent::__construct($configuration, $plugin_id, $plugin_definition, $migration);
70     $this->state = $state;
71   }
72
73   /**
74    * {@inheritdoc}
75    */
76   public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
77     return new static(
78       $configuration,
79       $plugin_id,
80       $plugin_definition,
81       $migration,
82       $container->get('state')
83     );
84   }
85
86   /**
87    * Prints the query string when the object is used as a string.
88    *
89    * @return string
90    *   The query string.
91    */
92   public function __toString() {
93     return (string) $this->query();
94   }
95
96   /**
97    * Gets the database connection object.
98    *
99    * @return \Drupal\Core\Database\Connection
100    *   The database connection.
101    */
102   public function getDatabase() {
103     if (!isset($this->database)) {
104       // See if the database info is in state - if not, fallback to
105       // configuration.
106       if (isset($this->configuration['database_state_key'])) {
107         $this->database = $this->setUpDatabase($this->state->get($this->configuration['database_state_key']));
108       }
109       elseif (($fallback_state_key = $this->state->get('migrate.fallback_state_key'))) {
110         $this->database = $this->setUpDatabase($this->state->get($fallback_state_key));
111       }
112       else {
113         $this->database = $this->setUpDatabase($this->configuration);
114       }
115     }
116     return $this->database;
117   }
118
119   /**
120    * Gets a connection to the referenced database.
121    *
122    * This method will add the database connection if necessary.
123    *
124    * @param array $database_info
125    *   Configuration for the source database connection. The keys are:
126    *    'key' - The database connection key.
127    *    'target' - The database connection target.
128    *    'database' - Database configuration array as accepted by
129    *      Database::addConnectionInfo.
130    *
131    * @return \Drupal\Core\Database\Connection
132    *   The connection to use for this plugin's queries.
133    *
134    * @throws \Drupal\migrate\Exception\RequirementsException
135    *   Thrown if no source database connection is configured.
136    */
137   protected function setUpDatabase(array $database_info) {
138     if (isset($database_info['key'])) {
139       $key = $database_info['key'];
140     }
141     else {
142       // If there is no explicit database configuration at all, fall back to a
143       // connection named 'migrate'.
144       $key = 'migrate';
145     }
146     if (isset($database_info['target'])) {
147       $target = $database_info['target'];
148     }
149     else {
150       $target = 'default';
151     }
152     if (isset($database_info['database'])) {
153       Database::addConnectionInfo($key, $target, $database_info['database']);
154     }
155     try {
156       $connection = Database::getConnection($target, $key);
157     }
158     catch (ConnectionNotDefinedException $e) {
159       // If we fell back to the magic 'migrate' connection and it doesn't exist,
160       // treat the lack of the connection as a RequirementsException.
161       if ($key == 'migrate') {
162         throw new RequirementsException("No database connection configured for source plugin " . $this->pluginId, [], 0, $e);
163       }
164       else {
165         throw $e;
166       }
167     }
168     return $connection;
169   }
170
171   /**
172    * {@inheritdoc}
173    */
174   public function checkRequirements() {
175     if ($this->pluginDefinition['requirements_met'] === TRUE) {
176       $this->getDatabase();
177     }
178   }
179
180   /**
181    * Wrapper for database select.
182    */
183   protected function select($table, $alias = NULL, array $options = []) {
184     $options['fetch'] = \PDO::FETCH_ASSOC;
185     return $this->getDatabase()->select($table, $alias, $options);
186   }
187
188   /**
189    * Adds tags and metadata to the query.
190    *
191    * @return \Drupal\Core\Database\Query\SelectInterface
192    *   The query with additional tags and metadata.
193    */
194   protected function prepareQuery() {
195     $this->query = clone $this->query();
196     $this->query->addTag('migrate');
197     $this->query->addTag('migrate_' . $this->migration->id());
198     $this->query->addMetaData('migration', $this->migration);
199
200     return $this->query;
201   }
202
203   /**
204    * {@inheritdoc}
205    */
206   protected function initializeIterator() {
207     // Initialize the batch size.
208     if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) {
209       // Valid batch sizes are integers >= 0.
210       if (is_int($this->configuration['batch_size']) && ($this->configuration['batch_size']) >= 0) {
211         $this->batchSize = $this->configuration['batch_size'];
212       }
213       else {
214         throw new MigrateException("batch_size must be greater than or equal to zero");
215       }
216     }
217
218     // If a batch has run the query is already setup.
219     if ($this->batch == 0) {
220       $this->prepareQuery();
221
222       // Get the key values, for potential use in joining to the map table.
223       $keys = [];
224
225       // The rules for determining what conditions to add to the query are as
226       // follows (applying first applicable rule):
227       // 1. If the map is joinable, join it. We will want to accept all rows
228       //    which are either not in the map, or marked in the map as NEEDS_UPDATE.
229       //    Note that if high water fields are in play, we want to accept all rows
230       //    above the high water mark in addition to those selected by the map
231       //    conditions, so we need to OR them together (but AND with any existing
232       //    conditions in the query). So, ultimately the SQL condition will look
233       //    like (original conditions) AND (map IS NULL OR map needs update
234       //      OR above high water).
235       $conditions = $this->query->orConditionGroup();
236       $condition_added = FALSE;
237       $added_fields = [];
238       if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) {
239         // Build the join to the map table. Because the source key could have
240         // multiple fields, we need to build things up.
241         $count = 1;
242         $map_join = '';
243         $delimiter = '';
244         foreach ($this->getIds() as $field_name => $field_schema) {
245           if (isset($field_schema['alias'])) {
246             $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name);
247           }
248           $map_join .= "$delimiter$field_name = map.sourceid" . $count++;
249           $delimiter = ' AND ';
250         }
251
252         $alias = $this->query->leftJoin($this->migration->getIdMap()
253           ->getQualifiedMapTableName(), 'map', $map_join);
254         $conditions->isNull($alias . '.sourceid1');
255         $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
256         $condition_added = TRUE;
257
258         // And as long as we have the map table, add its data to the row.
259         $n = count($this->getIds());
260         for ($count = 1; $count <= $n; $count++) {
261           $map_key = 'sourceid' . $count;
262           $this->query->addField($alias, $map_key, "migrate_map_$map_key");
263           $added_fields[] = "$alias.$map_key";
264         }
265         if ($n = count($this->migration->getDestinationIds())) {
266           for ($count = 1; $count <= $n; $count++) {
267             $map_key = 'destid' . $count++;
268             $this->query->addField($alias, $map_key, "migrate_map_$map_key");
269             $added_fields[] = "$alias.$map_key";
270           }
271         }
272         $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
273         $added_fields[] = "$alias.source_row_status";
274       }
275       // 2. If we are using high water marks, also include rows above the mark.
276       //    But, include all rows if the high water mark is not set.
277       if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater())) {
278         $high_water_field = $this->getHighWaterField();
279         $conditions->condition($high_water_field, $high_water, '>');
280         $this->query->orderBy($high_water_field);
281         $condition_added = TRUE;
282       }
283       if ($condition_added) {
284         $this->query->condition($conditions);
285       }
286       // If the query has a group by, our added fields need it too, to keep the
287       // query valid.
288       // @see https://dev.mysql.com/doc/refman/5.7/en/group-by-handling.html
289       $group_by = $this->query->getGroupBy();
290       if ($group_by && $added_fields) {
291         foreach ($added_fields as $added_field) {
292           $this->query->groupBy($added_field);
293         }
294       }
295     }
296
297     // Download data in batches for performance.
298     if (($this->batchSize > 0)) {
299       $this->query->range($this->batch * $this->batchSize, $this->batchSize);
300     }
301     return new \IteratorIterator($this->query->execute());
302   }
303
304   /**
305    * Position the iterator to the following row.
306    */
307   protected function fetchNextRow() {
308     $this->getIterator()->next();
309     // We might be out of data entirely, or just out of data in the current
310     // batch. Attempt to fetch the next batch and see.
311     if ($this->batchSize > 0 && !$this->getIterator()->valid()) {
312       $this->fetchNextBatch();
313     }
314   }
315
316   /**
317    * Prepares query for the next set of data from the source database.
318    */
319   protected function fetchNextBatch() {
320     $this->batch++;
321     unset($this->iterator);
322     $this->getIterator()->rewind();
323   }
324
325   /**
326    * @return \Drupal\Core\Database\Query\SelectInterface
327    */
328   abstract public function query();
329
330   /**
331    * {@inheritdoc}
332    */
333   public function count() {
334     return $this->query()->countQuery()->execute()->fetchField();
335   }
336
337   /**
338    * Checks if we can join against the map table.
339    *
340    * This function specifically catches issues when we're migrating with
341    * unique sets of credentials for the source and destination database.
342    *
343    * @return bool
344    *   TRUE if we can join against the map table otherwise FALSE.
345    */
346   protected function mapJoinable() {
347     if (!$this->getIds()) {
348       return FALSE;
349     }
350     // With batching, we want a later batch to return the same rows that would
351     // have been returned at the same point within a monolithic query. If we
352     // join to the map table, the first batch is writing to the map table and
353     // thus affecting the results of subsequent batches. To be safe, we avoid
354     // joining to the map table when batching.
355     if ($this->batchSize > 0) {
356       return FALSE;
357     }
358     $id_map = $this->migration->getIdMap();
359     if (!$id_map instanceof Sql) {
360       return FALSE;
361     }
362     $id_map_database_options = $id_map->getDatabase()->getConnectionOptions();
363     $source_database_options = $this->getDatabase()->getConnectionOptions();
364
365     // Special handling for sqlite which deals with files.
366     if ($id_map_database_options['driver'] === 'sqlite' &&
367       $source_database_options['driver'] === 'sqlite' &&
368       $id_map_database_options['database'] != $source_database_options['database']
369     ) {
370       return FALSE;
371     }
372
373     // FALSE if driver is PostgreSQL and database doesn't match.
374     if ($id_map_database_options['driver'] === 'pgsql' &&
375       $source_database_options['driver'] === 'pgsql' &&
376       $id_map_database_options['database'] != $source_database_options['database']
377       ) {
378       return FALSE;
379     }
380
381     foreach (['username', 'password', 'host', 'port', 'namespace', 'driver'] as $key) {
382       if (isset($source_database_options[$key])) {
383         if ($id_map_database_options[$key] != $source_database_options[$key]) {
384           return FALSE;
385         }
386       }
387     }
388     return TRUE;
389   }
390
391 }