Same name in this branch
  1. 10 core/modules/migrate/src/Plugin/migrate/source/SqlBase.php \Drupal\migrate\Plugin\migrate\source\SqlBase
  2. 10 core/modules/views/src/Plugin/views/pager/SqlBase.php \Drupal\views\Plugin\views\pager\SqlBase
Same name and namespace in other branches
  1. 8.9.x core/modules/migrate/src/Plugin/migrate/source/SqlBase.php \Drupal\migrate\Plugin\migrate\source\SqlBase
  2. 9 core/modules/migrate/src/Plugin/migrate/source/SqlBase.php \Drupal\migrate\Plugin\migrate\source\SqlBase

Sources whose data may be fetched via a database connection.

Available configuration keys:

  • database_state_key: (optional) Name of the state key which contains an array with database connection information.
  • key: (optional) The database key name. Defaults to 'migrate'.
  • target: (optional) The database target name. Defaults to 'default'.
  • batch_size: (optional) Number of records to fetch from the database during each batch. If omitted, all records are fetched in a single query.
  • ignore_map: (optional) Source data is joined to the map table by default to improve migration performance. If set to TRUE, the map table will not be joined. Using expressions in the query may result in column aliases in the JOIN clause which would be invalid SQL. If you run into this, set ignore_map to TRUE.

For other optional configuration keys inherited from the parent class, refer to \Drupal\migrate\Plugin\migrate\source\SourcePluginBase.

About the source database determination:

  • If the source plugin configuration contains 'database_state_key', its value is taken as the name of a state key which contains an array with the database configuration.
  • Otherwise, if the source plugin configuration contains 'key', the database configuration with that name is used.
  • If both 'database_state_key' and 'key' are omitted in the source plugin configuration, the database connection named 'migrate' is used by default.
  • If all of the above steps fail, RequirementsException is thrown.

Drupal Database API supports multiple database connections. The connection parameters are defined in $databases array in settings.php or settings.local.php. It is also possible to modify the $databases array in runtime. For example, Migrate Drupal, which provides the migrations from Drupal 6 / 7, asks for the source database connection parameters in the UI and then adds the $databases['migrate'] connection in runtime before the migrations are executed.

As described above, the default source database is $databases['migrate']. If the source plugin needs another source connection, the database connection parameters should be added to the $databases array as, for instance, $databases['foo']. The source plugin can then use this connection by setting 'key' to 'foo' in its configuration.

For a complete example on migrating data from an SQL source, refer to https://www.drupal.org/docs/8/api/migrate-api/migrating-data-from-sql-so...

Hierarchy

Expanded class hierarchy of SqlBase

See also

https://www.drupal.org/docs/8/api/database-api

\Drupal\migrate_drupal\Plugin\migrate\source\DrupalSqlBase

9 files declare their use of SqlBase
DrupalSqlBase.php in core/modules/migrate_drupal/src/Plugin/migrate/source/DrupalSqlBase.php
HighWaterTest.php in core/modules/migrate/tests/modules/migrate_high_water_test/src/Plugin/migrate/source/HighWaterTest.php
MigrateMissingDatabaseSource.php in core/modules/migrate/tests/modules/migrate_missing_database_test/src/Plugin/migrate/source/MigrateMissingDatabaseSource.php
MigrationPluginListTest.php in core/modules/migrate/tests/src/Kernel/Plugin/MigrationPluginListTest.php
QueryBatchTest.php in core/modules/migrate/tests/modules/migrate_query_batch_test/src/Plugin/migrate/source/QueryBatchTest.php

... See full list

File

core/modules/migrate/src/Plugin/migrate/source/SqlBase.php, line 69

Namespace

Drupal\migrate\Plugin\migrate\source
View source
abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPluginInterface, RequirementsInterface {

  /**
   * The query string.
   *
   * @var \Drupal\Core\Database\Query\SelectInterface
   */
  protected $query;

  /**
   * The database object.
   *
   * @var \Drupal\Core\Database\Connection
   */
  protected $database;

  /**
   * State service for retrieving database info.
   *
   * @var \Drupal\Core\State\StateInterface
   */
  protected $state;

  /**
   * The count of the number of batches run.
   *
   * @var int
   */
  protected $batch = 0;

  /**
   * Number of records to fetch from the database during each batch.
   *
   * A value of zero indicates no batching is to be done.
   *
   * @var int
   */
  protected $batchSize = 0;

  /**
   * {@inheritdoc}
   */
  public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, StateInterface $state) {
    parent::__construct($configuration, $plugin_id, $plugin_definition, $migration);
    $this->state = $state;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
    return new static($configuration, $plugin_id, $plugin_definition, $migration, $container
      ->get('state'));
  }

  /**
   * Prints the query string when the object is used as a string.
   *
   * @return string
   *   The query string.
   */
  public function __toString() {
    return (string) $this
      ->query();
  }

  /**
   * Gets the database connection object.
   *
   * @return \Drupal\Core\Database\Connection
   *   The database connection.
   */
  public function getDatabase() {
    if (!isset($this->database)) {

      // Look first for an explicit state key containing the configuration.
      if (isset($this->configuration['database_state_key'])) {
        $this->database = $this
          ->setUpDatabase($this->state
          ->get($this->configuration['database_state_key']));
      }
      elseif (isset($this->configuration['key'])) {
        $this->database = $this
          ->setUpDatabase($this->configuration);
      }
      elseif ($fallback_state_key = $this->state
        ->get('migrate.fallback_state_key')) {
        $this->database = $this
          ->setUpDatabase($this->state
          ->get($fallback_state_key));
      }
      else {
        $this->database = $this
          ->setUpDatabase([]);
      }
    }
    return $this->database;
  }

  /**
   * Gets a connection to the referenced database.
   *
   * This method will add the database connection if necessary.
   *
   * @param array $database_info
   *   Configuration for the source database connection. The keys are:
   *    'key' - The database connection key.
   *    'target' - The database connection target.
   *    'database' - Database configuration array as accepted by
   *      Database::addConnectionInfo.
   *
   * @return \Drupal\Core\Database\Connection
   *   The connection to use for this plugin's queries.
   *
   * @throws \Drupal\migrate\Exception\RequirementsException
   *   Thrown if no source database connection is configured.
   */
  protected function setUpDatabase(array $database_info) {

    // If there is no explicit database configuration at all, fall back to a
    // connection named 'migrate'.
    $key = $database_info['key'] ?? 'migrate';
    $target = $database_info['target'] ?? 'default';
    if (isset($database_info['database'])) {
      Database::addConnectionInfo($key, $target, $database_info['database']);
    }
    try {
      $connection = Database::getConnection($target, $key);
    } catch (ConnectionNotDefinedException $e) {

      // If we fell back to the magic 'migrate' connection and it doesn't exist,
      // treat the lack of the connection as a RequirementsException.
      if ($key == 'migrate') {
        throw new RequirementsException("No database connection configured for source plugin " . $this->pluginId, [], 0, $e);
      }
      else {
        throw $e;
      }
    }
    return $connection;
  }

  /**
   * {@inheritdoc}
   */
  public function checkRequirements() {
    if ($this->pluginDefinition['requirements_met'] === TRUE) {
      try {
        $this
          ->getDatabase();
      } catch (\PDOException|DatabaseException $e) {
        throw new RequirementsException("No database connection available for source plugin " . $this->pluginId, [], 0, $e);
      }
    }
  }

  /**
   * Wrapper for database select.
   */
  protected function select($table, $alias = NULL, array $options = []) {
    $options['fetch'] = \PDO::FETCH_ASSOC;
    return $this
      ->getDatabase()
      ->select($table, $alias, $options);
  }

  /**
   * Adds tags and metadata to the query.
   *
   * @return \Drupal\Core\Database\Query\SelectInterface
   *   The query with additional tags and metadata.
   */
  protected function prepareQuery() {
    $this->query = clone $this
      ->query();
    $this->query
      ->addTag('migrate');
    $this->query
      ->addTag('migrate_' . $this->migration
      ->id());
    $this->query
      ->addMetaData('migration', $this->migration);
    return $this->query;
  }

  /**
   * {@inheritdoc}
   */
  protected function initializeIterator() {

    // Initialize the batch size.
    if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) {

      // Valid batch sizes are integers >= 0.
      if (is_int($this->configuration['batch_size']) && $this->configuration['batch_size'] >= 0) {
        $this->batchSize = $this->configuration['batch_size'];
      }
      else {
        throw new MigrateException("batch_size must be greater than or equal to zero");
      }
    }

    // If a batch has run the query is already setup.
    if ($this->batch == 0) {
      $this
        ->prepareQuery();

      // The rules for determining what conditions to add to the query are as
      // follows (applying first applicable rule):
      // 1. If the map is joinable, join it. We will want to accept all rows
      //    which are either not in the map, or marked in the map as NEEDS_UPDATE.
      //    Note that if high water fields are in play, we want to accept all rows
      //    above the high water mark in addition to those selected by the map
      //    conditions, so we need to OR them together (but AND with any existing
      //    conditions in the query). So, ultimately the SQL condition will look
      //    like (original conditions) AND (map IS NULL OR map needs update
      //    OR above high water).
      $conditions = $this->query
        ->orConditionGroup();
      $condition_added = FALSE;
      $added_fields = [];
      if ($this
        ->mapJoinable()) {

        // Build the join to the map table. Because the source key could have
        // multiple fields, we need to build things up.
        $count = 1;
        $map_join = '';
        $delimiter = '';
        foreach ($this
          ->getIds() as $field_name => $field_schema) {
          if (isset($field_schema['alias'])) {
            $field_name = $field_schema['alias'] . '.' . $this->query
              ->escapeField($field_name);
          }
          $map_join .= "{$delimiter}{$field_name} = map.sourceid" . $count++;
          $delimiter = ' AND ';
        }
        $alias = $this->query
          ->leftJoin($this->migration
          ->getIdMap()
          ->getQualifiedMapTableName(), 'map', $map_join);
        $conditions
          ->isNull($alias . '.sourceid1');
        $conditions
          ->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
        $condition_added = TRUE;

        // And as long as we have the map table, add its data to the row.
        $n = count($this
          ->getIds());
        for ($count = 1; $count <= $n; $count++) {
          $map_key = 'sourceid' . $count;
          $this->query
            ->addField($alias, $map_key, "migrate_map_{$map_key}");
          $added_fields[] = "{$alias}.{$map_key}";
        }
        if ($n = count($this->migration
          ->getDestinationIds())) {
          for ($count = 1; $count <= $n; $count++) {
            $map_key = 'destid' . $count++;
            $this->query
              ->addField($alias, $map_key, "migrate_map_{$map_key}");
            $added_fields[] = "{$alias}.{$map_key}";
          }
        }
        $this->query
          ->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
        $added_fields[] = "{$alias}.source_row_status";
      }

      // 2. If we are using high water marks, also include rows above the mark.
      //    But, include all rows if the high water mark is not set.
      if ($this
        ->getHighWaterProperty()) {
        $high_water_field = $this
          ->getHighWaterField();
        $high_water = $this
          ->getHighWater();

        // We check against NULL because 0 is an acceptable value for the high
        // water mark.
        if ($high_water !== NULL) {
          $conditions
            ->condition($high_water_field, $high_water, '>');
          $condition_added = TRUE;
        }

        // Always sort by the high water field, to ensure that the first run
        // (before we have a high water value) also has the results in a
        // consistent order.
        $this->query
          ->orderBy($high_water_field);
      }
      if ($condition_added) {
        $this->query
          ->condition($conditions);
      }

      // If the query has a group by, our added fields need it too, to keep the
      // query valid.
      // @see https://dev.mysql.com/doc/refman/5.7/en/group-by-handling.html
      $group_by = $this->query
        ->getGroupBy();
      if ($group_by && $added_fields) {
        foreach ($added_fields as $added_field) {
          $this->query
            ->groupBy($added_field);
        }
      }
    }

    // Download data in batches for performance.
    if ($this->batchSize > 0) {
      $this->query
        ->range($this->batch * $this->batchSize, $this->batchSize);
    }
    $statement = $this->query
      ->execute();
    $statement
      ->setFetchMode(\PDO::FETCH_ASSOC);
    return new \IteratorIterator($statement);
  }

  /**
   * Position the iterator to the following row.
   */
  protected function fetchNextRow() {
    $this
      ->getIterator()
      ->next();

    // We might be out of data entirely, or just out of data in the current
    // batch. Attempt to fetch the next batch and see.
    if ($this->batchSize > 0 && !$this
      ->getIterator()
      ->valid()) {
      $this
        ->fetchNextBatch();
    }
  }

  /**
   * Prepares query for the next set of data from the source database.
   */
  protected function fetchNextBatch() {
    $this->batch++;
    unset($this->iterator);
    $this
      ->getIterator()
      ->rewind();
  }

  /**
   * {@inheritdoc}
   */
  public function rewind() : void {
    $this->batch = 0;

    // Database queries have to be run again as they cannot be rewound.
    unset($this->iterator);
    parent::rewind();
  }

  /**
   * @return \Drupal\Core\Database\Query\SelectInterface
   */
  public abstract function query();

  /**
   * Gets the source count using countQuery().
   */
  protected function doCount() {
    return (int) $this
      ->query()
      ->countQuery()
      ->execute()
      ->fetchField();
  }

  /**
   * Checks if we can join against the map table.
   *
   * This function specifically catches issues when we're migrating with
   * unique sets of credentials for the source and destination database.
   *
   * @return bool
   *   TRUE if we can join against the map table otherwise FALSE.
   */
  protected function mapJoinable() {

    // Do not join map if explicitly configured not to.
    if (isset($this->configuration['ignore_map']) && $this->configuration['ignore_map']) {
      return FALSE;
    }

    // If we are using high water, but haven't yet set a high water mark, do not
    // join the map table, as we want to get all available records.
    if ($this
      ->getHighWaterProperty() && $this
      ->getHighWater() === NULL) {
      return FALSE;
    }

    // If we are tracking changes, we also need to retrieve all rows to compare
    // hashes
    if ($this->trackChanges) {
      return FALSE;
    }
    if (!$this
      ->getIds()) {
      return FALSE;
    }

    // With batching, we want a later batch to return the same rows that would
    // have been returned at the same point within a monolithic query. If we
    // join to the map table, the first batch is writing to the map table and
    // thus affecting the results of subsequent batches. To be safe, we avoid
    // joining to the map table when batching.
    if ($this->batchSize > 0) {
      return FALSE;
    }
    $id_map = $this->migration
      ->getIdMap();
    if (!$id_map instanceof Sql) {
      return FALSE;
    }
    $id_map_database_options = $id_map
      ->getDatabase()
      ->getConnectionOptions();
    $source_database_options = $this
      ->getDatabase()
      ->getConnectionOptions();

    // Special handling for sqlite which deals with files.
    if ($id_map_database_options['driver'] === 'sqlite' && $source_database_options['driver'] === 'sqlite' && $id_map_database_options['database'] != $source_database_options['database']) {
      return FALSE;
    }

    // FALSE if driver is PostgreSQL and database doesn't match.
    if ($id_map_database_options['driver'] === 'pgsql' && $source_database_options['driver'] === 'pgsql' && $id_map_database_options['database'] != $source_database_options['database']) {
      return FALSE;
    }
    foreach ([
      'username',
      'password',
      'host',
      'port',
      'namespace',
      'driver',
    ] as $key) {
      if (isset($source_database_options[$key]) && isset($id_map_database_options[$key])) {
        if ($id_map_database_options[$key] != $source_database_options[$key]) {
          return FALSE;
        }
      }
    }
    return TRUE;
  }

  /**
   * {@inheritdoc}
   */
  public function __sleep() {
    return array_diff(parent::__sleep(), [
      'database',
    ]);
  }

}

Members

Namesort ascending Modifiers Type Description Overrides
StringTranslationTrait::t protected function Translates a string to the current language or to a given language.
StringTranslationTrait::setStringTranslation public function Sets the string translation service to use. 1
StringTranslationTrait::getStringTranslation protected function Gets the string translation service.
StringTranslationTrait::getNumberOfPlurals protected function Returns the number of plurals supported by a given language.
StringTranslationTrait::formatPlural protected function Formats a string containing a count of items.
StringTranslationTrait::$stringTranslation protected property The string translation service. 3
SqlBase::__toString public function Prints the query string when the object is used as a string. Overrides MigrateSourceInterface::__toString
SqlBase::__sleep public function Overrides DependencySerializationTrait::__sleep
SqlBase::__construct public function Constructs a \Drupal\Component\Plugin\PluginBase object. Overrides SourcePluginBase::__construct 3
SqlBase::setUpDatabase protected function Gets a connection to the referenced database.
SqlBase::select protected function Wrapper for database select.
SqlBase::rewind public function Overrides SourcePluginBase::rewind
SqlBase::query abstract public function
SqlBase::prepareQuery protected function Adds tags and metadata to the query.
SqlBase::mapJoinable protected function Checks if we can join against the map table. 1
SqlBase::initializeIterator protected function Initializes the iterator with the source data. Overrides SourcePluginBase::initializeIterator 18
SqlBase::getDatabase public function Gets the database connection object. 2
SqlBase::fetchNextRow protected function Position the iterator to the following row. Overrides SourcePluginBase::fetchNextRow
SqlBase::fetchNextBatch protected function Prepares query for the next set of data from the source database.
SqlBase::doCount protected function Gets the source count using countQuery(). Overrides SourcePluginBase::doCount 6
SqlBase::create public static function Creates an instance of the plugin. Overrides ContainerFactoryPluginInterface::create 1
SqlBase::checkRequirements public function Checks if requirements for this plugin are OK. Overrides RequirementsInterface::checkRequirements 1
SqlBase::$state protected property State service for retrieving database info.
SqlBase::$query protected property The query string.
SqlBase::$database protected property The database object. 1
SqlBase::$batchSize protected property Number of records to fetch from the database during each batch.
SqlBase::$batch protected property The count of the number of batches run.
SourcePluginBase::valid public function
SourcePluginBase::saveHighWater protected function Save the new high water mark.
SourcePluginBase::rowChanged protected function Checks if the incoming row has changed since our last import.
SourcePluginBase::preRollback public function Performs pre-rollback tasks. Overrides RollbackAwareInterface::preRollback
SourcePluginBase::prepareRow public function Adds additional data to the row. Overrides MigrateSourceInterface::prepareRow 49
SourcePluginBase::postRollback public function Performs post-rollback tasks. Overrides RollbackAwareInterface::postRollback
SourcePluginBase::next public function
SourcePluginBase::key public function
SourcePluginBase::getSourceModule public function Gets the source module providing the source data. Overrides MigrateSourceInterface::getSourceModule
SourcePluginBase::getModuleHandler protected function Gets the module handler.
SourcePluginBase::getIterator protected function Returns the iterator that will yield the row arrays to be processed.
SourcePluginBase::getHighWaterStorage protected function Get the high water storage object.
SourcePluginBase::getHighWaterProperty protected function Get information on the property used as the high watermark.
SourcePluginBase::getHighWaterField protected function Get the name of the field used as the high watermark.
SourcePluginBase::getHighWater protected function The current value of the high water mark.
SourcePluginBase::getCurrentIds public function Gets the currentSourceIds data member.
SourcePluginBase::getCache protected function Gets the cache object.
SourcePluginBase::current public function
SourcePluginBase::count public function 2
SourcePluginBase::aboveHighWater protected function Check if the incoming data is newer than what we've previously imported.
SourcePluginBase::$trackChanges protected property Flags whether to track changes to incoming data. 1
SourcePluginBase::$skipCount protected property Whether this instance should not attempt to count the source. 1
SourcePluginBase::$originalHighWater protected property The high water mark at the beginning of the import operation.
SourcePluginBase::$moduleHandler protected property The module handler service. 2
SourcePluginBase::$migration protected property The entity migration object.
SourcePluginBase::$mapRowAdded protected property Flags whether source plugin will read the map row and add to data row.
SourcePluginBase::$iterator protected property The iterator to iterate over the source rows.
SourcePluginBase::$idMap protected property The migration ID map.
SourcePluginBase::$highWaterStorage protected property The key-value storage for the high-water value.
SourcePluginBase::$highWaterProperty protected property Information on the property used as the high-water mark.
SourcePluginBase::$currentSourceIds protected property The primary key of the current row.
SourcePluginBase::$currentRow protected property The current row from the query.
SourcePluginBase::$cacheKey protected property Key to use for caching counts.
SourcePluginBase::$cacheCounts protected property Whether this instance should cache the source count. 1
SourcePluginBase::$cache protected property The backend cache.
PluginBase::isConfigurable public function Determines if the plugin is configurable.
PluginBase::getPluginId public function Gets the plugin_id of the plugin instance. Overrides PluginInspectionInterface::getPluginId
PluginBase::getPluginDefinition public function Gets the definition of the plugin implementation. Overrides PluginInspectionInterface::getPluginDefinition 2
PluginBase::getDerivativeId public function Gets the derivative_id of the plugin instance. Overrides DerivativeInspectionInterface::getDerivativeId
PluginBase::getBaseId public function Gets the base_plugin_id of the plugin instance. Overrides DerivativeInspectionInterface::getBaseId
PluginBase::DERIVATIVE_SEPARATOR constant A string which is used to separate base plugin IDs from the derivative ID.
PluginBase::$pluginId protected property The plugin_id.
PluginBase::$pluginDefinition protected property The plugin implementation definition. 1
PluginBase::$configuration protected property Configuration information passed into the plugin. 1
MigrateSourceInterface::NOT_COUNTABLE constant Indicates that the source is not countable.
MigrateSourceInterface::getIds public function Defines the source fields uniquely identifying a source row. 87
MigrateSourceInterface::fields public function Returns available fields on the source. 87
MessengerTrait::setMessenger public function Sets the messenger.
MessengerTrait::messenger public function Gets the messenger. 10
MessengerTrait::$messenger protected property The messenger. 10
DependencySerializationTrait::__wakeup public function 2
DependencySerializationTrait::$_serviceIds protected property
DependencySerializationTrait::$_entityStorages protected property