Same name and namespace in other branches
  1. 8.9.x core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php \Drupal\migrate\Plugin\migrate\source\SourcePluginBase
  2. 9 core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php \Drupal\migrate\Plugin\migrate\source\SourcePluginBase

The base class for source plugins.

The migration uses the next() method to iterate over rows returned by the source plugin. Information about the row is also tracked using the ID map plugin. For each row, the corresponding tracked map row, if it exists, is deleted before allowing modification to the source row. Then, source plugins can modify the row using the prepareRow() method, which also invokes hook_prepare_row(). The row is now prepared and we can decide if it will be processed.

To be processed the row must meet any of these conditions:

When set to be processed, the row is also marked frozen and no further changes to the row source properties are allowed. The last step is to set the high-water value, if high water is in use.

Available configuration keys:

  • cache_counts: (optional) If set, cache the source count.
  • cache_key: (optional) Uniquely named cache key used for cache_counts.
  • skip_count: (optional) If set, do not attempt to count the source.
  • track_changes: (optional) If set, track changes to incoming data.
  • high_water_property: (optional) It is an array of name & alias values (optional table alias). This high_water_property is typically a timestamp or serial id showing what was the last imported record. Only content with a higher value will be imported.
  • constants: (optional) An array of constants that can be used in the process pipeline. To use the constant 'foo' as a source value use 'constants/foo'.

The high_water_property and track_changes are mutually exclusive.

Example:


source:
  plugin: some_source_plugin_name
  cache_counts: true
  track_changes: true

This example uses the plugin "some_source_plugin_name" and caches the count of available source records to save calculating it every time count() is called. Changes to incoming data are watched (because track_changes is true), which can affect the result of prepareRow().

Example:


source:
  plugin: some_source_plugin_name
  skip_count: true
  high_water_property:
    name: changed
    alias: n

In this example, skip_count is true which means count() will not attempt to count the available source records, but just always return MigrateSourceInterface::NOT_COUNTABLE instead. The high_water_property defines which field marks the last imported row of the migration. This will get converted into a SQL condition that looks like 'n.changed' or 'changed' if no alias.

Example:


source:
  plugin: some_source_plugin_name
  constants:
    - foo: bar
process:
  baz: constants/bar

In this example, the constant 'foo' is defined with a value of 'bar'. It is later used in the process pipeline to set the value of the field baz.

Hierarchy

Expanded class hierarchy of SourcePluginBase

See also

\Drupal\migrate\Annotation\MigrateSource

\Drupal\migrate\Plugin\MigrateIdMapInterface

\Drupal\migrate\Plugin\MigratePluginManager

\Drupal\migrate\Plugin\MigrateSourceInterface

Plugin API

Related topics

6 files declare their use of SourcePluginBase
BlockTranslation.php in core/modules/block/src/Plugin/migrate/source/d6/BlockTranslation.php
CacheableEmbeddedDataSource.php in core/modules/migrate/tests/modules/migrate_cache_counts_test/src/Plugin/migrate/source/CacheableEmbeddedDataSource.php
ContentEntity.php in core/modules/migrate_drupal/src/Plugin/migrate/source/ContentEntity.php
MigrateExternalTranslatedTestSource.php in core/modules/migrate/tests/modules/migrate_external_translated_test/src/Plugin/migrate/source/MigrateExternalTranslatedTestSource.php
MigrateSourceTest.php in core/modules/migrate/tests/src/Unit/MigrateSourceTest.php

... See full list

File

core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php, line 117

Namespace

Drupal\migrate\Plugin\migrate\source
View source
abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface, RollbackAwareInterface {

  /**
   * The module handler service.
   *
   * @var \Drupal\Core\Extension\ModuleHandlerInterface
   */
  protected $moduleHandler;

  /**
   * The entity migration object.
   *
   * @var \Drupal\migrate\Plugin\MigrationInterface
   */
  protected $migration;

  /**
   * The current row from the query.
   *
   * @var \Drupal\migrate\Row
   */
  protected $currentRow;

  /**
   * The primary key of the current row.
   *
   * @var array
   */
  protected $currentSourceIds;

  /**
   * Information on the property used as the high-water mark.
   *
   * Array of 'name' and (optional) db 'alias' properties used for high-water
   * mark.
   *
   * @var array
   */
  protected $highWaterProperty = [];

  /**
   * The key-value storage for the high-water value.
   *
   * @var \Drupal\Core\KeyValueStore\KeyValueStoreInterface
   */
  protected $highWaterStorage;

  /**
   * The high water mark at the beginning of the import operation.
   *
   * If the source has a property for tracking changes (like Drupal has
   * node.changed) then this is the highest value of those imported so far.
   *
   * @var int
   */
  protected $originalHighWater;

  /**
   * Whether this instance should cache the source count.
   *
   * @var bool
   */
  protected $cacheCounts = FALSE;

  /**
   * Key to use for caching counts.
   *
   * @var string
   */
  protected $cacheKey;

  /**
   * Whether this instance should not attempt to count the source.
   *
   * @var bool
   */
  protected $skipCount = FALSE;

  /**
   * Flags whether to track changes to incoming data.
   *
   * If TRUE, we will maintain hashed source rows to determine whether incoming
   * data has changed.
   *
   * @var bool
   */
  protected $trackChanges = FALSE;

  /**
   * Flags whether source plugin will read the map row and add to data row.
   *
   * By default, next() will directly read the map row and add it to the data
   * row. A source plugin implementation may do this itself (in particular, the
   * SQL source can incorporate the map table into the query) - if so, it should
   * set this TRUE so we don't duplicate the effort.
   *
   * @var bool
   */
  protected $mapRowAdded = FALSE;

  /**
   * The backend cache.
   *
   * @var \Drupal\Core\Cache\CacheBackendInterface
   */
  protected $cache;

  /**
   * The migration ID map.
   *
   * @var \Drupal\migrate\Plugin\MigrateIdMapInterface
   */
  protected $idMap;

  /**
   * The iterator to iterate over the source rows.
   *
   * @var \Iterator
   */
  protected $iterator;

  /**
   * {@inheritdoc}
   */
  public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration) {
    parent::__construct($configuration, $plugin_id, $plugin_definition);
    $this->migration = $migration;

    // Set up some defaults based on the source configuration.
    foreach ([
      'cacheCounts' => 'cache_counts',
      'skipCount' => 'skip_count',
      'trackChanges' => 'track_changes',
    ] as $property => $config_key) {
      if (isset($configuration[$config_key])) {
        $this->{$property} = (bool) $configuration[$config_key];
      }
    }
    if ($this->cacheCounts) {
      $this->cacheKey = $configuration['cache_key'] ?? $plugin_id . '-' . hash('sha256', Json::encode($configuration));
    }
    $this->idMap = $this->migration
      ->getIdMap();
    $this->highWaterProperty = !empty($configuration['high_water_property']) ? $configuration['high_water_property'] : FALSE;

    // Pull out the current high-water mark if we have a high-water property.
    if ($this->highWaterProperty) {
      $this->originalHighWater = $this
        ->getHighWater();
    }

    // Don't allow the use of both high water and track changes together.
    if ($this->highWaterProperty && $this->trackChanges) {
      throw new MigrateException('You should either use a high-water mark or track changes not both. They are both designed to solve the same problem');
    }
  }

  /**
   * Initializes the iterator with the source data.
   *
   * @return \Iterator
   *   Returns an iterable object of data for this source.
   */
  protected abstract function initializeIterator();

  /**
   * Gets the module handler.
   *
   * @return \Drupal\Core\Extension\ModuleHandlerInterface
   *   The module handler.
   */
  protected function getModuleHandler() {
    if (!isset($this->moduleHandler)) {
      $this->moduleHandler = \Drupal::moduleHandler();
    }
    return $this->moduleHandler;
  }

  /**
   * {@inheritdoc}
   */
  public function prepareRow(Row $row) {
    $result = TRUE;
    try {
      $result_hook = $this
        ->getModuleHandler()
        ->invokeAll('migrate_prepare_row', [
        $row,
        $this,
        $this->migration,
      ]);
      $result_named_hook = $this
        ->getModuleHandler()
        ->invokeAll('migrate_' . $this->migration
        ->id() . '_prepare_row', [
        $row,
        $this,
        $this->migration,
      ]);

      // We will skip if any hook returned FALSE.
      $skip = $result_hook && in_array(FALSE, $result_hook) || $result_named_hook && in_array(FALSE, $result_named_hook);
      $save_to_map = TRUE;
    } catch (MigrateSkipRowException $e) {
      $skip = TRUE;
      $save_to_map = $e
        ->getSaveToMap();
      if ($message = trim($e
        ->getMessage())) {
        $this->idMap
          ->saveMessage($row
          ->getSourceIdValues(), $message, MigrationInterface::MESSAGE_INFORMATIONAL);
      }
    }

    // We're explicitly skipping this row - keep track in the map table.
    if ($skip) {

      // Make sure we replace any previous messages for this item with any
      // new ones.
      if ($save_to_map) {
        $this->idMap
          ->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED);
        $this->currentRow = NULL;
        $this->currentSourceIds = NULL;
      }
      $result = FALSE;
    }
    elseif ($this->trackChanges) {

      // When tracking changed data, We want to quietly skip (rather than
      // "ignore") rows with changes. The caller needs to make that decision,
      // so we need to provide them with the necessary information (before and
      // after hashes).
      $row
        ->rehash();
    }
    return $result;
  }

  /**
   * Returns the iterator that will yield the row arrays to be processed.
   *
   * @return \Iterator
   *   The iterator that will yield the row arrays to be processed.
   */
  protected function getIterator() {
    if (!isset($this->iterator)) {
      $this->iterator = $this
        ->initializeIterator();
    }
    return $this->iterator;
  }

  /**
   * {@inheritdoc}
   */

  #[\ReturnTypeWillChange]
  public function current() {
    return $this->currentRow;
  }

  /**
   * Gets the iterator key.
   *
   * Implementation of \Iterator::key() - called when entering a loop iteration,
   * returning the key of the current row. It must be a scalar - we will
   * serialize to fulfill the requirement, but using getCurrentIds() is
   * preferable.
   */

  #[\ReturnTypeWillChange]
  public function key() {
    return serialize($this->currentSourceIds);
  }

  /**
   * Checks whether the iterator is currently valid.
   *
   * Implementation of \Iterator::valid() - called at the top of the loop,
   * returning TRUE to process the loop and FALSE to terminate it.
   */

  #[\ReturnTypeWillChange]
  public function valid() {
    return isset($this->currentRow);
  }

  /**
   * Rewinds the iterator.
   *
   * Implementation of \Iterator::rewind() - subclasses of SourcePluginBase
   * should implement initializeIterator() to do any class-specific setup for
   * iterating source records.
   */

  #[\ReturnTypeWillChange]
  public function rewind() {
    $this
      ->getIterator()
      ->rewind();
    $this
      ->next();
  }

  /**
   * {@inheritdoc}
   */

  #[\ReturnTypeWillChange]
  public function next() {
    $this->currentSourceIds = NULL;
    $this->currentRow = NULL;

    // In order to find the next row we want to process, we ask the source
    // plugin for the next possible row.
    while (!isset($this->currentRow) && $this
      ->getIterator()
      ->valid()) {
      $row_data = $this
        ->getIterator()
        ->current() + $this->configuration;
      $this
        ->fetchNextRow();
      $row = new Row($row_data, $this
        ->getIds());

      // Populate the source key for this row.
      $this->currentSourceIds = $row
        ->getSourceIdValues();

      // Pick up the existing map row, if any, unless fetchNextRow() did it.
      if (!$this->mapRowAdded && ($id_map = $this->idMap
        ->getRowBySource($this->currentSourceIds))) {
        $row
          ->setIdMap($id_map);
      }

      // Clear any previous messages for this row before potentially adding
      // new ones.
      if (!empty($this->currentSourceIds)) {
        $this->idMap
          ->delete($this->currentSourceIds, TRUE);
      }

      // Preparing the row gives source plugins the chance to skip.
      if ($this
        ->prepareRow($row) === FALSE) {
        continue;
      }

      // Check whether the row needs processing.
      // 1. This row has not been imported yet.
      // 2. Explicitly set to update.
      // 3. The row is newer than the current high-water mark.
      // 4. If no such property exists then try by checking the hash of the row.
      if (!$row
        ->getIdMap() || $row
        ->needsUpdate() || $this
        ->aboveHighWater($row) || $this
        ->rowChanged($row)) {
        $this->currentRow = $row
          ->freezeSource();
      }
      if ($this
        ->getHighWaterProperty()) {
        $this
          ->saveHighWater($row
          ->getSourceProperty($this->highWaterProperty['name']));
      }
    }
  }

  /**
   * Position the iterator to the following row.
   */
  protected function fetchNextRow() {
    $this
      ->getIterator()
      ->next();
  }

  /**
   * Check if the incoming data is newer than what we've previously imported.
   *
   * @param \Drupal\migrate\Row $row
   *   The row we're importing.
   *
   * @return bool
   *   TRUE if the high-water value in the row is greater than our current
   *   value.
   */
  protected function aboveHighWater(Row $row) {
    return $this
      ->getHighWaterProperty() && $row
      ->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater;
  }

  /**
   * Checks if the incoming row has changed since our last import.
   *
   * @param \Drupal\migrate\Row $row
   *   The row we're importing.
   *
   * @return bool
   *   TRUE if the row has changed otherwise FALSE.
   */
  protected function rowChanged(Row $row) {
    return $this->trackChanges && $row
      ->changed();
  }

  /**
   * Gets the currentSourceIds data member.
   */
  public function getCurrentIds() {
    return $this->currentSourceIds;
  }

  /**
   * Gets the source count.
   *
   * Return a count of available source records, from the cache if appropriate.
   * Returns MigrateSourceInterface::NOT_COUNTABLE if the source is not
   * countable.
   *
   * @param bool $refresh
   *   (optional) Whether or not to refresh the count. Defaults to FALSE. Not
   *   all implementations support the reset flag. In such instances this
   *   parameter is ignored and the result of calling the method will always be
   *   up to date.
   *
   * @return int
   *   The count.
   */

  #[\ReturnTypeWillChange]
  public function count($refresh = FALSE) {
    if ($this->skipCount) {
      return MigrateSourceInterface::NOT_COUNTABLE;
    }

    // Return the cached count if we are caching counts and a refresh is not
    // requested.
    if ($this->cacheCounts && !$refresh) {
      $cache_object = $this
        ->getCache()
        ->get($this->cacheKey, 'cache');
      if (is_object($cache_object)) {
        return $cache_object->data;
      }
    }
    $count = $this
      ->doCount();

    // Update the cache if we are caching counts.
    if ($this->cacheCounts) {
      $this
        ->getCache()
        ->set($this->cacheKey, $count);
    }
    return $count;
  }

  /**
   * Gets the cache object.
   *
   * @return \Drupal\Core\Cache\CacheBackendInterface
   *   The cache object.
   */
  protected function getCache() {
    if (!isset($this->cache)) {
      $this->cache = \Drupal::cache('migrate');
    }
    return $this->cache;
  }

  /**
   * Gets the source count.
   *
   * Checks if the source is countable or using the iterator_count function.
   *
   * @return int
   */
  protected function doCount() {
    $iterator = $this
      ->getIterator();
    return $iterator instanceof \Countable ? $iterator
      ->count() : iterator_count($this
      ->initializeIterator());
  }

  /**
   * Get the high water storage object.
   *
   * @return \Drupal\Core\KeyValueStore\KeyValueStoreInterface
   *   The storage object.
   */
  protected function getHighWaterStorage() {
    if (!isset($this->highWaterStorage)) {
      $this->highWaterStorage = \Drupal::keyValue('migrate:high_water');
    }
    return $this->highWaterStorage;
  }

  /**
   * The current value of the high water mark.
   *
   * The high water mark defines a timestamp stating the time the import was last
   * run. If the mark is set, only content with a higher timestamp will be
   * imported.
   *
   * @return int|null
   *   A Unix timestamp representing the high water mark, or NULL if no high
   *   water mark has been stored.
   */
  protected function getHighWater() {
    return $this
      ->getHighWaterStorage()
      ->get($this->migration
      ->id());
  }

  /**
   * Save the new high water mark.
   *
   * @param int $high_water
   *   The high water timestamp.
   */
  protected function saveHighWater($high_water) {
    $this
      ->getHighWaterStorage()
      ->set($this->migration
      ->id(), $high_water);
  }

  /**
   * Get information on the property used as the high watermark.
   *
   * Array of 'name' & (optional) db 'alias' properties used for high watermark.
   *
   * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
   *
   * @return array
   *   The property used as the high watermark.
   */
  protected function getHighWaterProperty() {
    return $this->highWaterProperty;
  }

  /**
   * Get the name of the field used as the high watermark.
   *
   * The name of the field qualified with an alias if available.
   *
   * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
   *
   * @return string|null
   *   The name of the field for the high water mark, or NULL if not set.
   */
  protected function getHighWaterField() {
    if (!empty($this->highWaterProperty['name'])) {
      return !empty($this->highWaterProperty['alias']) ? $this->highWaterProperty['alias'] . '.' . $this->highWaterProperty['name'] : $this->highWaterProperty['name'];
    }
    return NULL;
  }

  /**
   * {@inheritdoc}
   */
  public function preRollback(MigrateRollbackEvent $event) {

    // Nothing to do in this implementation.
  }

  /**
   * {@inheritdoc}
   */
  public function postRollback(MigrateRollbackEvent $event) {

    // Reset the high-water mark.
    $this
      ->saveHighWater(NULL);
  }

  /**
   * {@inheritdoc}
   */
  public function getSourceModule() {
    if (!empty($this->configuration['source_module'])) {
      return $this->configuration['source_module'];
    }
    elseif (!empty($this->pluginDefinition['source_module'])) {
      return $this->pluginDefinition['source_module'];
    }
    return NULL;
  }

}

Members

Name Modifierssort descending Type Description Overrides
PluginBase::DERIVATIVE_SEPARATOR constant A string which is used to separate base plugin IDs from the derivative ID.
MigrateSourceInterface::NOT_COUNTABLE constant Indicates that the source is not countable.
SourcePluginBase::initializeIterator abstract protected function Initializes the iterator with the source data. 6
SourcePluginBase::getModuleHandler protected function Gets the module handler.
SourcePluginBase::getIterator protected function Returns the iterator that will yield the row arrays to be processed.
SourcePluginBase::fetchNextRow protected function Position the iterator to the following row. 1
SourcePluginBase::aboveHighWater protected function Check if the incoming data is newer than what we've previously imported.
SourcePluginBase::rowChanged protected function Checks if the incoming row has changed since our last import.
SourcePluginBase::getCache protected function Gets the cache object.
SourcePluginBase::doCount protected function Gets the source count. 4
SourcePluginBase::getHighWaterStorage protected function Get the high water storage object.
SourcePluginBase::getHighWater protected function The current value of the high water mark.
SourcePluginBase::saveHighWater protected function Save the new high water mark.
SourcePluginBase::getHighWaterProperty protected function Get information on the property used as the high watermark.
SourcePluginBase::getHighWaterField protected function Get the name of the field used as the high watermark.
StringTranslationTrait::t protected function Translates a string to the current language or to a given language.
StringTranslationTrait::formatPlural protected function Formats a string containing a count of items.
StringTranslationTrait::getNumberOfPlurals protected function Returns the number of plurals supported by a given language.
StringTranslationTrait::getStringTranslation protected function Gets the string translation service.
SourcePluginBase::$moduleHandler protected property The module handler service. 2
SourcePluginBase::$migration protected property The entity migration object.
SourcePluginBase::$currentRow protected property The current row from the query.
SourcePluginBase::$currentSourceIds protected property The primary key of the current row.
SourcePluginBase::$highWaterProperty protected property Information on the property used as the high-water mark.
SourcePluginBase::$highWaterStorage protected property The key-value storage for the high-water value.
SourcePluginBase::$originalHighWater protected property The high water mark at the beginning of the import operation.
SourcePluginBase::$cacheCounts protected property Whether this instance should cache the source count. 1
SourcePluginBase::$cacheKey protected property Key to use for caching counts.
SourcePluginBase::$skipCount protected property Whether this instance should not attempt to count the source. 1
SourcePluginBase::$trackChanges protected property Flags whether to track changes to incoming data. 1
SourcePluginBase::$mapRowAdded protected property Flags whether source plugin will read the map row and add to data row.
SourcePluginBase::$cache protected property The backend cache.
SourcePluginBase::$idMap protected property The migration ID map.
SourcePluginBase::$iterator protected property The iterator to iterate over the source rows.
StringTranslationTrait::$stringTranslation protected property The string translation service. 3
DependencySerializationTrait::$_serviceIds protected property
DependencySerializationTrait::$_entityStorages protected property
MessengerTrait::$messenger protected property The messenger. 10
PluginBase::$pluginId protected property The plugin_id.
PluginBase::$pluginDefinition protected property The plugin implementation definition. 1
PluginBase::$configuration protected property Configuration information passed into the plugin. 1
SourcePluginBase::__construct public function Constructs a \Drupal\Component\Plugin\PluginBase object. Overrides PluginBase::__construct 4
SourcePluginBase::prepareRow public function Adds additional data to the row. Overrides MigrateSourceInterface::prepareRow 49
SourcePluginBase::current public function
SourcePluginBase::key public function
SourcePluginBase::valid public function
SourcePluginBase::rewind public function 1
SourcePluginBase::next public function
SourcePluginBase::getCurrentIds public function Gets the currentSourceIds data member.
SourcePluginBase::count public function 2
SourcePluginBase::preRollback public function Performs pre-rollback tasks. Overrides RollbackAwareInterface::preRollback
SourcePluginBase::postRollback public function Performs post-rollback tasks. Overrides RollbackAwareInterface::postRollback
SourcePluginBase::getSourceModule public function Gets the source module providing the source data. Overrides MigrateSourceInterface::getSourceModule
StringTranslationTrait::setStringTranslation public function Sets the string translation service to use. 1
DependencySerializationTrait::__sleep public function 2
DependencySerializationTrait::__wakeup public function 2
MessengerTrait::setMessenger public function Sets the messenger.
MessengerTrait::messenger public function Gets the messenger. 10
PluginBase::getPluginId public function Gets the plugin_id of the plugin instance. Overrides PluginInspectionInterface::getPluginId
PluginBase::getBaseId public function Gets the base_plugin_id of the plugin instance. Overrides DerivativeInspectionInterface::getBaseId
PluginBase::getDerivativeId public function Gets the derivative_id of the plugin instance. Overrides DerivativeInspectionInterface::getDerivativeId
PluginBase::getPluginDefinition public function Gets the definition of the plugin implementation. Overrides PluginInspectionInterface::getPluginDefinition 2
PluginBase::isConfigurable public function Determines if the plugin is configurable.
MigrateSourceInterface::fields public function Returns available fields on the source. 87
MigrateSourceInterface::__toString public function Allows class to decide how it will react when it is treated like a string. 6
MigrateSourceInterface::getIds public function Defines the source fields uniquely identifying a source row. 87