3 namespace Drupal\migrate\Plugin\migrate\source;
5 use Drupal\Core\Plugin\PluginBase;
6 use Drupal\migrate\Event\MigrateRollbackEvent;
7 use Drupal\migrate\Event\RollbackAwareInterface;
8 use Drupal\migrate\Plugin\MigrationInterface;
9 use Drupal\migrate\MigrateException;
10 use Drupal\migrate\MigrateSkipRowException;
11 use Drupal\migrate\Plugin\MigrateIdMapInterface;
12 use Drupal\migrate\Plugin\MigrateSourceInterface;
13 use Drupal\migrate\Row;
16 * The base class for source plugins.
18 * Available configuration keys:
19 * - cache_counts: (optional) If set, cache the source count.
20 * - skip_count: (optional) If set, do not attempt to count the source.
21 * - track_changes: (optional) If set, track changes to incoming data.
22 * - high_water_property: (optional) It is an array of name & alias values
23 * (optional table alias). This high_water_property is typically a timestamp
24 * or serial id showing what was the last imported record. Only content with a
25 * higher value will be imported.
27 * The high_water_property and track_changes are mutually exclusive.
33 * plugin: some_source_plugin_name
38 * This example uses the plugin "some_source_plugin_name" and caches the count
39 * of available source records to save calculating it every time count() is
40 * called. Changes to incoming data are watched (because track_changes is true),
41 * which can affect the result of prepareRow().
47 * plugin: some_source_plugin_name
49 * high_water_property:
54 * In this example, skip_count is true which means count() will not attempt to
55 * count the available source records, but just always return -1 instead. The
56 * high_water_property defines which field marks the last imported row of the
57 * migration. This will get converted into a SQL condition that looks like
58 * 'n.changed' or 'changed' if no alias.
60 * @see \Drupal\migrate\Plugin\MigratePluginManager
61 * @see \Drupal\migrate\Annotation\MigrateSource
62 * @see \Drupal\migrate\Plugin\MigrateSourceInterface
67 abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface, RollbackAwareInterface {
70 * The module handler service.
72 * @var \Drupal\Core\Extension\ModuleHandlerInterface
74 protected $moduleHandler;
77 * The entity migration object.
79 * @var \Drupal\migrate\Plugin\MigrationInterface
84 * The current row from the query.
86 * @var \Drupal\Migrate\Row
88 protected $currentRow;
91 * The primary key of the current row.
95 protected $currentSourceIds;
98 * Information on the property used as the high-water mark.
100 * Array of 'name' and (optional) db 'alias' properties used for high-water
105 protected $highWaterProperty = [];
108 * The key-value storage for the high-water value.
110 * @var \Drupal\Core\KeyValueStore\KeyValueStoreInterface
112 protected $highWaterStorage;
115 * The high water mark at the beginning of the import operation.
117 * If the source has a property for tracking changes (like Drupal has
118 * node.changed) then this is the highest value of those imported so far.
122 protected $originalHighWater;
125 * Whether this instance should cache the source count.
129 protected $cacheCounts = FALSE;
132 * Key to use for caching counts.
139 * Whether this instance should not attempt to count the source.
143 protected $skipCount = FALSE;
146 * Flags whether to track changes to incoming data.
148 * If TRUE, we will maintain hashed source rows to determine whether incoming
153 protected $trackChanges = FALSE;
156 * Flags whether source plugin will read the map row and add to data row.
158 * By default, next() will directly read the map row and add it to the data
159 * row. A source plugin implementation may do this itself (in particular, the
160 * SQL source can incorporate the map table into the query) - if so, it should
161 * set this TRUE so we don't duplicate the effort.
165 protected $mapRowAdded = FALSE;
170 * @var \Drupal\Core\Cache\CacheBackendInterface
175 * The migration ID map.
177 * @var \Drupal\migrate\Plugin\MigrateIdMapInterface
182 * The iterator to iterate over the source rows.
191 public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration) {
192 parent::__construct($configuration, $plugin_id, $plugin_definition);
193 $this->migration = $migration;
195 // Set up some defaults based on the source configuration.
196 foreach (['cacheCounts' => 'cache_counts', 'skipCount' => 'skip_count', 'trackChanges' => 'track_changes'] as $property => $config_key) {
197 if (isset($configuration[$config_key])) {
198 $this->$property = (bool) $configuration[$config_key];
201 $this->cacheKey = !empty($configuration['cache_key']) ? $configuration['cache_key'] : NULL;
202 $this->idMap = $this->migration->getIdMap();
203 $this->highWaterProperty = !empty($configuration['high_water_property']) ? $configuration['high_water_property'] : FALSE;
205 // Pull out the current highwater mark if we have a highwater property.
206 if ($this->highWaterProperty) {
207 $this->originalHighWater = $this->getHighWater();
210 // Don't allow the use of both highwater and track changes together.
211 if ($this->highWaterProperty && $this->trackChanges) {
212 throw new MigrateException('You should either use a highwater mark or track changes not both. They are both designed to solve the same problem');
217 * Initializes the iterator with the source data.
220 * An array of the data for this source.
222 abstract protected function initializeIterator();
225 * Gets the module handler.
227 * @return \Drupal\Core\Extension\ModuleHandlerInterface
228 * The module handler.
230 protected function getModuleHandler() {
231 if (!isset($this->moduleHandler)) {
232 $this->moduleHandler = \Drupal::moduleHandler();
234 return $this->moduleHandler;
240 public function prepareRow(Row $row) {
243 $result_hook = $this->getModuleHandler()->invokeAll('migrate_prepare_row', [$row, $this, $this->migration]);
244 $result_named_hook = $this->getModuleHandler()->invokeAll('migrate_' . $this->migration->id() . '_prepare_row', [$row, $this, $this->migration]);
245 // We will skip if any hook returned FALSE.
246 $skip = ($result_hook && in_array(FALSE, $result_hook)) || ($result_named_hook && in_array(FALSE, $result_named_hook));
249 catch (MigrateSkipRowException $e) {
251 $save_to_map = $e->getSaveToMap();
252 if ($message = trim($e->getMessage())) {
253 $this->idMap->saveMessage($row->getSourceIdValues(), $message, MigrationInterface::MESSAGE_INFORMATIONAL);
257 // We're explicitly skipping this row - keep track in the map table.
259 // Make sure we replace any previous messages for this item with any
262 $this->idMap->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED);
263 $this->currentRow = NULL;
264 $this->currentSourceIds = NULL;
268 elseif ($this->trackChanges) {
269 // When tracking changed data, We want to quietly skip (rather than
270 // "ignore") rows with changes. The caller needs to make that decision,
271 // so we need to provide them with the necessary information (before and
279 * Returns the iterator that will yield the row arrays to be processed.
282 * The iterator that will yield the row arrays to be processed.
284 protected function getIterator() {
285 if (!isset($this->iterator)) {
286 $this->iterator = $this->initializeIterator();
288 return $this->iterator;
294 public function current() {
295 return $this->currentRow;
299 * Gets the iterator key.
301 * Implementation of \Iterator::key() - called when entering a loop iteration,
302 * returning the key of the current row. It must be a scalar - we will
303 * serialize to fulfill the requirement, but using getCurrentIds() is
306 public function key() {
307 return serialize($this->currentSourceIds);
311 * Checks whether the iterator is currently valid.
313 * Implementation of \Iterator::valid() - called at the top of the loop,
314 * returning TRUE to process the loop and FALSE to terminate it.
316 public function valid() {
317 return isset($this->currentRow);
321 * Rewinds the iterator.
323 * Implementation of \Iterator::rewind() - subclasses of SourcePluginBase
324 * should implement initializeIterator() to do any class-specific setup for
325 * iterating source records.
327 public function rewind() {
328 $this->getIterator()->rewind();
335 * The migration iterates over rows returned by the source plugin. This
336 * method determines the next row which will be processed and imported into
339 * The method tracks the source and destination IDs using the ID map plugin.
341 * This also takes care about highwater support. Highwater allows to reimport
342 * rows from a previous migration run, which got changed in the meantime.
343 * This is done by specifying a highwater field, which is compared with the
344 * last time, the migration got executed (originalHighWater).
346 public function next() {
347 $this->currentSourceIds = NULL;
348 $this->currentRow = NULL;
350 // In order to find the next row we want to process, we ask the source
351 // plugin for the next possible row.
352 while (!isset($this->currentRow) && $this->getIterator()->valid()) {
354 $row_data = $this->getIterator()->current() + $this->configuration;
355 $this->fetchNextRow();
356 $row = new Row($row_data, $this->migration->getSourcePlugin()->getIds(), $this->migration->getDestinationIds());
358 // Populate the source key for this row.
359 $this->currentSourceIds = $row->getSourceIdValues();
361 // Pick up the existing map row, if any, unless fetchNextRow() did it.
362 if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentSourceIds))) {
363 $row->setIdMap($id_map);
366 // Clear any previous messages for this row before potentially adding
368 if (!empty($this->currentSourceIds)) {
369 $this->idMap->delete($this->currentSourceIds, TRUE);
372 // Preparing the row gives source plugins the chance to skip.
373 if ($this->prepareRow($row) === FALSE) {
377 // Check whether the row needs processing.
378 // 1. This row has not been imported yet.
379 // 2. Explicitly set to update.
380 // 3. The row is newer than the current highwater mark.
381 // 4. If no such property exists then try by checking the hash of the row.
382 if (!$row->getIdMap() || $row->needsUpdate() || $this->aboveHighwater($row) || $this->rowChanged($row)) {
383 $this->currentRow = $row->freezeSource();
386 if ($this->getHighWaterProperty()) {
387 $this->saveHighWater($row->getSourceProperty($this->highWaterProperty['name']));
393 * Position the iterator to the following row.
395 protected function fetchNextRow() {
396 $this->getIterator()->next();
400 * Check if the incoming data is newer than what we've previously imported.
402 * @param \Drupal\migrate\Row $row
403 * The row we're importing.
406 * TRUE if the highwater value in the row is greater than our current value.
408 protected function aboveHighwater(Row $row) {
409 return $this->getHighWaterProperty() && $row->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater;
413 * Checks if the incoming row has changed since our last import.
415 * @param \Drupal\migrate\Row $row
416 * The row we're importing.
419 * TRUE if the row has changed otherwise FALSE.
421 protected function rowChanged(Row $row) {
422 return $this->trackChanges && $row->changed();
426 * Gets the currentSourceIds data member.
428 public function getCurrentIds() {
429 return $this->currentSourceIds;
433 * Gets the source count.
435 * Return a count of available source records, from the cache if appropriate.
436 * Returns -1 if the source is not countable.
438 * @param bool $refresh
439 * (optional) Whether or not to refresh the count. Defaults to FALSE. Not
440 * all implementations support the reset flag. In such instances this
441 * parameter is ignored and the result of calling the method will always be
447 public function count($refresh = FALSE) {
448 if ($this->skipCount) {
452 if (!isset($this->cacheKey)) {
453 $this->cacheKey = hash('sha256', $this->getPluginId());
456 // If a refresh is requested, or we're not caching counts, ask the derived
457 // class to get the count from the source.
458 if ($refresh || !$this->cacheCounts) {
459 $count = $this->doCount();
460 $this->getCache()->set($this->cacheKey, $count);
463 // Caching is in play, first try to retrieve a cached count.
464 $cache_object = $this->getCache()->get($this->cacheKey, 'cache');
465 if (is_object($cache_object)) {
467 $count = $cache_object->data;
470 // No cached count, ask the derived class to count 'em up, and cache
472 $count = $this->doCount();
473 $this->getCache()->set($this->cacheKey, $count);
480 * Gets the cache object.
482 * @return \Drupal\Core\Cache\CacheBackendInterface
485 protected function getCache() {
486 if (!isset($this->cache)) {
487 $this->cache = \Drupal::cache('migrate');
493 * Gets the source count checking if the source is countable or using the
494 * iterator_count function.
498 protected function doCount() {
499 $iterator = $this->getIterator();
500 return $iterator instanceof \Countable ? $iterator->count() : iterator_count($this->initializeIterator());
504 * Get the high water storage object.
506 * @return \Drupal\Core\KeyValueStore\KeyValueStoreInterface
507 * The storage object.
509 protected function getHighWaterStorage() {
510 if (!isset($this->highWaterStorage)) {
511 $this->highWaterStorage = \Drupal::keyValue('migrate:high_water');
513 return $this->highWaterStorage;
517 * The current value of the high water mark.
519 * The high water mark defines a timestamp stating the time the import was last
520 * run. If the mark is set, only content with a higher timestamp will be
524 * A Unix timestamp representing the high water mark, or NULL if no high
525 * water mark has been stored.
527 protected function getHighWater() {
528 return $this->getHighWaterStorage()->get($this->migration->id());
532 * Save the new high water mark.
534 * @param int $high_water
535 * The high water timestamp.
537 protected function saveHighWater($high_water) {
538 $this->getHighWaterStorage()->set($this->migration->id(), $high_water);
542 * Get information on the property used as the high watermark.
544 * Array of 'name' & (optional) db 'alias' properties used for high watermark.
546 * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
549 * The property used as the high watermark.
551 protected function getHighWaterProperty() {
552 return $this->highWaterProperty;
556 * Get the name of the field used as the high watermark.
558 * The name of the field qualified with an alias if available.
560 * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
562 * @return string|null
563 * The name of the field for the high water mark, or NULL if not set.
565 protected function getHighWaterField() {
566 if (!empty($this->highWaterProperty['name'])) {
567 return !empty($this->highWaterProperty['alias']) ?
568 $this->highWaterProperty['alias'] . '.' . $this->highWaterProperty['name'] :
569 $this->highWaterProperty['name'];
577 public function preRollback(MigrateRollbackEvent $event) {
578 // Nothing to do in this implementation.
584 public function postRollback(MigrateRollbackEvent $event) {
585 // Reset the high-water mark.
586 $this->saveHighWater(NULL);
592 public function getSourceModule() {
593 if (!empty($this->configuration['source_module'])) {
594 return $this->configuration['source_module'];
596 elseif (!empty($this->pluginDefinition['source_module'])) {
597 return $this->pluginDefinition['source_module'];