View source
<?php
namespace Drupal\migrate;
use Drupal\Component\Utility\Bytes;
use Drupal\Core\StringTranslation\ByteSizeMarkup;
use Drupal\Core\Utility\Error;
use Drupal\Core\StringTranslation\StringTranslationTrait;
use Drupal\migrate\Event\MigrateEvents;
use Drupal\migrate\Event\MigrateImportEvent;
use Drupal\migrate\Event\MigratePostRowSaveEvent;
use Drupal\migrate\Event\MigratePreRowSaveEvent;
use Drupal\migrate\Event\MigrateRollbackEvent;
use Drupal\migrate\Event\MigrateRowDeleteEvent;
use Drupal\migrate\Exception\RequirementsException;
use Drupal\migrate\Plugin\MigrateIdMapInterface;
use Drupal\migrate\Plugin\MigrationInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
class MigrateExecutable implements MigrateExecutableInterface {
use StringTranslationTrait;
protected $migration;
protected $sourceRowStatus;
protected $memoryThreshold = 0.85;
protected $memoryLimit;
protected $sourceIdValues;
protected $counts = [];
protected $source;
protected $eventDispatcher;
public $message;
public function __construct(MigrationInterface $migration, MigrateMessageInterface $message = NULL, EventDispatcherInterface $event_dispatcher = NULL) {
$this->migration = $migration;
$this->message = $message ?: new MigrateMessage();
$this
->getIdMap()
->setMessage($this->message);
$this->eventDispatcher = $event_dispatcher;
$limit = trim(ini_get('memory_limit'));
if ($limit == '-1') {
$this->memoryLimit = PHP_INT_MAX;
}
else {
$this->memoryLimit = Bytes::toNumber($limit);
}
}
protected function getSource() {
if (!isset($this->source)) {
$this->source = $this->migration
->getSourcePlugin();
}
return $this->source;
}
protected function getEventDispatcher() {
if (!$this->eventDispatcher) {
$this->eventDispatcher = \Drupal::service('event_dispatcher');
}
return $this->eventDispatcher;
}
public function import() {
if ($this->migration
->getStatus() !== MigrationInterface::STATUS_IDLE) {
$this->message
->display($this
->t('Migration @id is busy with another operation: @status', [
'@id' => $this->migration
->id(),
'@status' => $this
->t($this->migration
->getStatusLabel()),
]), 'error');
return MigrationInterface::RESULT_FAILED;
}
$this
->getEventDispatcher()
->dispatch(new MigrateImportEvent($this->migration, $this->message), MigrateEvents::PRE_IMPORT);
try {
$this->migration
->checkRequirements();
} catch (RequirementsException $e) {
$this->message
->display($this
->t('Migration @id did not meet the requirements. @message', [
'@id' => $this->migration
->id(),
'@message' => $e
->getMessage(),
]), 'error');
return MigrationInterface::RESULT_FAILED;
}
$this->migration
->setStatus(MigrationInterface::STATUS_IMPORTING);
$source = $this
->getSource();
try {
$source
->rewind();
} catch (\Exception $e) {
$this->message
->display($this
->t('Migration failed with source plugin exception: @e in @file line @line', [
'@e' => $e
->getMessage(),
'@file' => $e
->getFile(),
'@line' => $e
->getLine(),
]), 'error');
$this->migration
->setStatus(MigrationInterface::STATUS_IDLE);
return MigrationInterface::RESULT_FAILED;
}
$pipeline = FALSE;
if ($source
->valid()) {
try {
$pipeline = $this->migration
->getProcessPlugins();
} catch (MigrateException $e) {
$row = $source
->current();
$this->sourceIdValues = $row
->getSourceIdValues();
$this
->getIdMap()
->saveIdMapping($row, [], $e
->getStatus());
$this
->saveMessage($e
->getMessage(), $e
->getLevel());
}
}
$return = MigrationInterface::RESULT_COMPLETED;
if ($pipeline) {
$id_map = $this
->getIdMap();
$destination = $this->migration
->getDestinationPlugin();
while ($source
->valid()) {
$row = $source
->current();
$this->sourceIdValues = $row
->getSourceIdValues();
try {
foreach ($pipeline as $destination_property_name => $plugins) {
$this
->processPipeline($row, $destination_property_name, $plugins, NULL);
}
$save = TRUE;
} catch (MigrateException $e) {
$this
->getIdMap()
->saveIdMapping($row, [], $e
->getStatus());
$msg = sprintf("%s:%s:%s", $this->migration
->getPluginId(), $destination_property_name, $e
->getMessage());
$this
->saveMessage($msg, $e
->getLevel());
$save = FALSE;
} catch (MigrateSkipRowException $e) {
if ($e
->getSaveToMap()) {
$id_map
->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED);
}
if ($message = trim($e
->getMessage())) {
$msg = sprintf("%s:%s: %s", $this->migration
->getPluginId(), $destination_property_name, $message);
$this
->saveMessage($msg, MigrationInterface::MESSAGE_INFORMATIONAL);
}
$save = FALSE;
}
if ($save) {
try {
$this
->getEventDispatcher()
->dispatch(new MigratePreRowSaveEvent($this->migration, $this->message, $row), MigrateEvents::PRE_ROW_SAVE);
$destination_ids = $id_map
->lookupDestinationIds($this->sourceIdValues);
$destination_id_values = $destination_ids ? reset($destination_ids) : [];
$destination_id_values = $destination
->import($row, $destination_id_values);
$this
->getEventDispatcher()
->dispatch(new MigratePostRowSaveEvent($this->migration, $this->message, $row, $destination_id_values), MigrateEvents::POST_ROW_SAVE);
if ($destination_id_values) {
if ($destination_id_values !== TRUE) {
$id_map
->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $destination
->rollbackAction());
}
}
else {
$id_map
->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);
if (!$id_map
->messageCount()) {
$message = $this
->t('New object was not saved, no error provided');
$this
->saveMessage($message);
$this->message
->display($message);
}
}
} catch (MigrateException $e) {
$this
->getIdMap()
->saveIdMapping($row, [], $e
->getStatus());
$this
->saveMessage($e
->getMessage(), $e
->getLevel());
} catch (\Exception $e) {
$this
->getIdMap()
->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);
$this
->handleException($e);
}
}
$this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED;
if (($return = $this
->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {
break;
}
if ($this->migration
->getStatus() == MigrationInterface::STATUS_STOPPING) {
$return = $this->migration
->getInterruptionResult();
$this->migration
->clearInterruptionResult();
break;
}
try {
$source
->next();
} catch (\Exception $e) {
$this->message
->display($this
->t('Migration failed with source plugin exception: @e in @file line @line', [
'@e' => $e
->getMessage(),
'@file' => $e
->getFile(),
'@line' => $e
->getLine(),
]), 'error');
$this->migration
->setStatus(MigrationInterface::STATUS_IDLE);
return MigrationInterface::RESULT_FAILED;
}
}
}
$this
->getEventDispatcher()
->dispatch(new MigrateImportEvent($this->migration, $this->message), MigrateEvents::POST_IMPORT);
$this->migration
->setStatus(MigrationInterface::STATUS_IDLE);
return $return;
}
public function rollback() {
if ($this->migration
->getStatus() !== MigrationInterface::STATUS_IDLE) {
$this->message
->display($this
->t('Migration @id is busy with another operation: @status', [
'@id' => $this->migration
->id(),
'@status' => $this
->t($this->migration
->getStatusLabel()),
]), 'error');
return MigrationInterface::RESULT_FAILED;
}
$this
->getEventDispatcher()
->dispatch(new MigrateRollbackEvent($this->migration), MigrateEvents::PRE_ROLLBACK);
$return = MigrationInterface::RESULT_COMPLETED;
$this->migration
->setStatus(MigrationInterface::STATUS_ROLLING_BACK);
$id_map = $this
->getIdMap();
$destination = $this->migration
->getDestinationPlugin();
$id_map
->rewind();
while ($id_map
->valid()) {
$destination_key = $id_map
->currentDestination();
if ($destination_key) {
$map_row = $id_map
->getRowByDestination($destination_key);
if (!isset($map_row['rollback_action']) || $map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) {
$this
->getEventDispatcher()
->dispatch(new MigrateRowDeleteEvent($this->migration, $destination_key), MigrateEvents::PRE_ROW_DELETE);
$destination
->rollback($destination_key);
$this
->getEventDispatcher()
->dispatch(new MigrateRowDeleteEvent($this->migration, $destination_key), MigrateEvents::POST_ROW_DELETE);
}
$id_map
->deleteDestination($destination_key);
}
else {
$source_key = $id_map
->currentSource();
$id_map
->delete($source_key);
}
$id_map
->next();
if (($return = $this
->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {
break;
}
if ($this->migration
->getStatus() == MigrationInterface::STATUS_STOPPING) {
$return = $this->migration
->getInterruptionResult();
$this->migration
->clearInterruptionResult();
break;
}
}
$this
->getEventDispatcher()
->dispatch(new MigrateRollbackEvent($this->migration), MigrateEvents::POST_ROLLBACK);
$this->migration
->setStatus(MigrationInterface::STATUS_IDLE);
return $return;
}
protected function getIdMap() {
return $this->migration
->getIdMap();
}
public function processRow(Row $row, array $process = NULL, $value = NULL) {
foreach ($this->migration
->getProcessPlugins($process) as $destination => $plugins) {
$this
->processPipeline($row, $destination, $plugins, $value);
}
}
protected function processPipeline(Row $row, string $destination, array $plugins, $value) {
$multiple = FALSE;
foreach ($plugins as $plugin) {
$definition = $plugin
->getPluginDefinition();
if ($multiple && !$definition['handle_multiples']) {
$new_value = [];
if (!is_array($value)) {
throw new MigrateException(sprintf('Pipeline failed at %s plugin for destination %s: %s received instead of an array,', $plugin
->getPluginId(), $destination, $value));
}
$break = FALSE;
foreach ($value as $scalar_value) {
$plugin
->reset();
try {
$new_value[] = $plugin
->transform($scalar_value, $this, $row, $destination);
} catch (MigrateSkipProcessException $e) {
$new_value[] = NULL;
$break = TRUE;
} catch (MigrateException $e) {
$message = sprintf("%s: %s", $plugin
->getPluginId(), $e
->getMessage());
throw new MigrateException($message);
}
if ($plugin
->isPipelineStopped()) {
$break = TRUE;
}
}
$value = $new_value;
if ($break) {
break;
}
}
else {
$plugin
->reset();
try {
$value = $plugin
->transform($value, $this, $row, $destination);
} catch (MigrateSkipProcessException $e) {
$value = NULL;
break;
} catch (MigrateException $e) {
$message = sprintf("%s: %s", $plugin
->getPluginId(), $e
->getMessage());
throw new MigrateException($message);
}
if ($plugin
->isPipelineStopped()) {
break;
}
$multiple = $plugin
->multiple();
}
}
if ($plugins) {
if (isset($value)) {
$row
->setDestinationProperty($destination, $value);
}
else {
$row
->setEmptyDestinationProperty($destination);
}
}
}
protected function currentSourceIds() {
return $this
->getSource()
->getCurrentIds();
}
public function saveMessage($message, $level = MigrationInterface::MESSAGE_ERROR) {
$this
->getIdMap()
->saveMessage($this->sourceIdValues, $message, $level);
}
protected function handleException(\Exception $exception, $save = TRUE) {
$result = Error::decodeException($exception);
$message = $result['@message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')';
if ($save) {
$this
->saveMessage($message);
}
$this->message
->display($message, 'error');
}
protected function checkStatus() {
if ($this
->memoryExceeded()) {
return MigrationInterface::RESULT_INCOMPLETE;
}
return MigrationInterface::RESULT_COMPLETED;
}
protected function memoryExceeded() {
$usage = $this
->getMemoryUsage();
$pct_memory = $usage / $this->memoryLimit;
if (!($threshold = $this->memoryThreshold)) {
return FALSE;
}
if ($pct_memory > $threshold) {
$this->message
->display($this
->t('Memory usage is @usage (@pct% of limit @limit), reclaiming memory.', [
'@pct' => round($pct_memory * 100),
'@usage' => ByteSizeMarkup::create($usage, NULL, $this->stringTranslation),
'@limit' => ByteSizeMarkup::create($this->memoryLimit, NULL, $this->stringTranslation),
]), 'warning');
$usage = $this
->attemptMemoryReclaim();
$pct_memory = $usage / $this->memoryLimit;
if ($pct_memory > 0.9 * $threshold) {
$this->message
->display($this
->t('Memory usage is now @usage (@pct% of limit @limit), not enough reclaimed, starting new batch', [
'@pct' => round($pct_memory * 100),
'@usage' => ByteSizeMarkup::create($usage, NULL, $this->stringTranslation),
'@limit' => ByteSizeMarkup::create($this->memoryLimit, NULL, $this->stringTranslation),
]), 'warning');
return TRUE;
}
else {
$this->message
->display($this
->t('Memory usage is now @usage (@pct% of limit @limit), reclaimed enough, continuing', [
'@pct' => round($pct_memory * 100),
'@usage' => ByteSizeMarkup::create($usage, NULL, $this->stringTranslation),
'@limit' => ByteSizeMarkup::create($this->memoryLimit, NULL, $this->stringTranslation),
]), 'warning');
return FALSE;
}
}
else {
return FALSE;
}
}
protected function getMemoryUsage() {
return memory_get_usage();
}
protected function attemptMemoryReclaim() {
drupal_static_reset();
\Drupal::service('entity.memory_cache')
->deleteAll();
gc_collect_cycles();
return memory_get_usage();
}
protected function formatSize($size) {
@trigger_error(__METHOD__ . '() is deprecated in drupal:10.2.0 and is removed from drupal:11.0.0. Use \\Drupal\\Core\\StringTranslation\\ByteSizeMarkup::create($size, $langcode) instead. See https://www.drupal.org/node/2999981', E_USER_DEPRECATED);
return format_size($size);
}
}