diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml
new file mode 100644
index 00000000..a3b87ee7
--- /dev/null
+++ b/.github/workflows/php.yml
@@ -0,0 +1,20 @@
+name: PHP Composer
+
+on:
+ push:
+ branches: [ master 4.2 4.1 3.1 1.6 ]
+ pull_request:
+
+jobs:
+ build:
+
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v2
+
+ - name: Install dependencies
+ run: php -r "copy('https://cs.symfony.com/download/php-cs-fixer-v3.phar', 'php-cs-fixer.phar');"
+
+ - name: Run php-cs-fixer
+ run: php php-cs-fixer.phar fix --dry-run --config=.php_cs.php --cache-file=.php_cs.cache --verbose --show-progress=dots --diff --allow-risky=yes
diff --git a/.php_cs.php b/.php_cs.php
index 9c9bd589..010fef1c 100644
--- a/.php_cs.php
+++ b/.php_cs.php
@@ -1,17 +1,18 @@
in(__DIR__);
-return PhpCsFixer\Config::create()
+return (new \PhpCsFixer\Config())
->setFinder($finder)
->setRules(
[
// generic PSRs
'@PSR1' => true,
'@PSR2' => true,
- 'psr0' => true,
- 'psr4' => true,
+ '@PSR12' => true,
+ '@PSR12:risky' => true,
+ 'psr_autoloading' => true,
// imports
'ordered_imports' => true,
@@ -39,11 +40,14 @@
'escape_implicit_backslashes' => false,
// PHP
- '@PHP71Migration' => true,
- '@PHP71Migration:risky' => true,
+ '@PHP74Migration' => true,
+ '@PHP74Migration:risky' => true,
+ '@PHP80Migration' => true,
+ '@PHP80Migration:risky' => true,
+ 'use_arrow_functions' => false,
+ 'get_class_to_class_keyword' => false,
'void_return' => false,
- 'visibility_required' => false,
'list_syntax' => ['syntax' => 'long'],
'declare_strict_types' => false,
@@ -64,11 +68,11 @@
'@Symfony:risky' => true,
'phpdoc_types_order' => false,
'phpdoc_separation' => false,
- 'phpdoc_inline_tag' => false,
+ 'visibility_required' => ['elements' => ['property', 'method']],
+ 'types_spaces' => false,
'native_function_invocation' => false,
'concat_space' => ['spacing' => 'one'],
'single_space_after_construct' => false,
- 'trailing_comma_in_multiline_array' => false,
'self_accessor' => false,
'yoda_style' => false,
'phpdoc_summary' => false,
@@ -87,6 +91,7 @@
'ternary_operator_spaces' => false,
'phpdoc_no_useless_inheritdoc' => false,
'class_definition' => false,
+ 'string_length_to_empty' => false,
]
)
->setRiskyAllowed(true);
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index 4dbe8f07..00000000
--- a/.travis.yml
+++ /dev/null
@@ -1,13 +0,0 @@
-language: php
-
-php:
- - '7.4'
- - '8.0'
-
-install:
- - php -r "copy('https://cs.symfony.com/download/php-cs-fixer-v2.phar', 'php-cs-fixer.phar');"
- - php -r "copy('https://squizlabs.github.io/PHP_CodeSniffer/phpcs.phar', 'phpcs.phar');"
-
-script:
- - php php-cs-fixer.phar fix --dry-run --config=.php_cs.php --cache-file=.php_cs.cache --verbose --show-progress=dots --diff --allow-risky=yes
- - php phpcs.phar -p --extensions=php --standard=PSR1,PSR12 --cache=.phpcs.cache --report=code --report=diff -n .
diff --git a/Async/ImportProductProcessor.php b/Async/ImportProductProcessor.php
index b023086c..b56dd02d 100644
--- a/Async/ImportProductProcessor.php
+++ b/Async/ImportProductProcessor.php
@@ -3,13 +3,15 @@
namespace Oro\Bundle\AkeneoBundle\Async;
use Doctrine\ORM\EntityManagerInterface;
+use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\EntityBundle\ORM\DoctrineHelper;
use Oro\Bundle\IntegrationBundle\Authentication\Token\IntegrationTokenAwareTrait;
use Oro\Bundle\IntegrationBundle\Entity\Channel as Integration;
+use Oro\Bundle\IntegrationBundle\Entity\FieldsChanges;
use Oro\Bundle\IntegrationBundle\Provider\SyncProcessorRegistry;
+use Oro\Bundle\MessageQueueBundle\Entity\Job;
use Oro\Component\MessageQueue\Client\TopicSubscriberInterface;
use Oro\Component\MessageQueue\Consumption\MessageProcessorInterface;
-use Oro\Component\MessageQueue\Job\Job;
use Oro\Component\MessageQueue\Job\JobRunner;
use Oro\Component\MessageQueue\Transport\MessageInterface;
use Oro\Component\MessageQueue\Transport\SessionInterface;
@@ -19,6 +21,7 @@
class ImportProductProcessor implements MessageProcessorInterface, TopicSubscriberInterface
{
+ use CacheProviderTrait;
use IntegrationTokenAwareTrait;
/** @var DoctrineHelper */
@@ -97,12 +100,31 @@ public function process(MessageInterface $message, SessionInterface $session)
function (JobRunner $jobRunner, Job $child) use ($integration, $body) {
$this->doctrineHelper->refreshIncludingUnitializedRelations($integration);
$processor = $this->syncProcessorRegistry->getProcessorForIntegration($integration);
+
+ $em = $this->doctrineHelper->getEntityManager(FieldsChanges::class);
+ /** @var FieldsChanges $fieldsChanges */
+ $fieldsChanges = $em
+ ->getRepository(FieldsChanges::class)
+ ->findOneBy(['entityId' => $child->getId(), 'entityClass' => Job::class]);
+
+ if (!$fieldsChanges) {
+ $this->logger->error(
+ sprintf('Source data from Akeneo not found for job: %s', $child->getId())
+ );
+
+ return false;
+ }
+
+ $this->cacheProvider->save('akeneo', $fieldsChanges->getChangedFields());
+
$status = $processor->process(
$integration,
$body['connector'] ?? null,
$body['connector_parameters'] ?? []
);
+ $em->clear(FieldsChanges::class);
+
return $status;
}
);
diff --git a/Command/CleanupCommand.php b/Command/CleanupCommand.php
new file mode 100644
index 00000000..d4e2e794
--- /dev/null
+++ b/Command/CleanupCommand.php
@@ -0,0 +1,104 @@
+doctrineHelper = $doctrineHelper;
+ parent::__construct();
+ }
+
+ public function isActive()
+ {
+ return true;
+ }
+
+ public function getDefaultDefinition()
+ {
+ return '0 2 * * 6';
+ }
+
+ public function configure()
+ {
+ $this
+ ->setDescription('Clears old records from oro_integration_fields_changes table.')
+ ->setHelp(
+ <<<'HELP'
+ The %command.name% command clears fields changes for complete job records
+ from oro_integration_fields_changes table.
+
+ php %command.full_name%
+ HELP
+ );
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ $output->writeln(sprintf(
+ 'Number of fields changes that has been deleted: %d',
+ $this->deleteRecords()
+ ));
+
+ $output->writeln('Fields changes cleanup complete');
+ }
+
+ private function deleteRecords(): int
+ {
+ $qb = $this->doctrineHelper
+ ->getEntityManagerForClass(FieldsChanges::class)
+ ->getRepository(FieldsChanges::class)
+ ->createQueryBuilder('fc');
+
+ $qb
+ ->delete(FieldsChanges::class, 'fc')
+ ->where($qb->expr()->eq('fc.entityClass', ':class'))
+ ->setParameter('class', Job::class)
+ ->andWhere($qb->expr()->in('fc.entityId', ':ids'));
+
+ $jqb = $this->doctrineHelper
+ ->getEntityManagerForClass(Job::class)
+ ->getRepository(Job::class)
+ ->createQueryBuilder('j');
+
+ $jqb
+ ->select('j.id')
+ ->where($jqb->expr()->in('j.status', ':statuses'))
+ ->setParameter('statuses', [Job::STATUS_SUCCESS, Job::STATUS_CANCELLED, Job::STATUS_FAILED, Job::STATUS_STALE])
+ ->orderBy($jqb->expr()->desc('j.id'));
+
+ $iterator = new BufferedIdentityQueryResultIterator($jqb->getQuery());
+
+ $result = 0;
+ $iterator->setPageLoadedCallback(function (array $rows) use ($qb, &$result): array {
+ $ids = array_column($rows, 'id');
+
+ $result = $result + $qb->setParameter('ids', $ids)->getQuery()->execute();
+
+ return $ids;
+ });
+
+ iterator_to_array($iterator);
+
+ return $result;
+ }
+}
diff --git a/DependencyInjection/OroAkeneoExtension.php b/DependencyInjection/OroAkeneoExtension.php
index 64d5af6f..cb32ab1e 100644
--- a/DependencyInjection/OroAkeneoExtension.php
+++ b/DependencyInjection/OroAkeneoExtension.php
@@ -22,6 +22,7 @@ class OroAkeneoExtension extends Extension
public function load(array $configs, ContainerBuilder $container)
{
$loader = new Loader\YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config'));
+ $loader->load('commands.yml');
$loader->load('integration.yml');
$loader->load('importexport.yml');
$loader->load('services.yml');
diff --git a/ImportExport/Reader/ProductImageReader.php b/ImportExport/Reader/ProductImageReader.php
index 5435aa98..265f629e 100644
--- a/ImportExport/Reader/ProductImageReader.php
+++ b/ImportExport/Reader/ProductImageReader.php
@@ -4,12 +4,14 @@
use Oro\Bundle\AkeneoBundle\ImportExport\AkeneoIntegrationTrait;
use Oro\Bundle\AkeneoBundle\Integration\AkeneoFileManager;
+use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\EntityBundle\ORM\DoctrineHelper;
use Oro\Bundle\ImportExportBundle\Context\ContextInterface;
class ProductImageReader extends IteratorBasedReader
{
use AkeneoIntegrationTrait;
+ use CacheProviderTrait;
/** @var array */
private $attributesImageFilter = [];
@@ -45,10 +47,7 @@ protected function initializeFromContext(ContextInterface $context)
$this->initAttributesImageList();
- $items = $this->stepExecution
- ->getJobExecution()
- ->getExecutionContext()
- ->get('items') ?? [];
+ $items = $this->cacheProvider->fetch('akeneo')['items'] ?? [];
if (!empty($items)) {
$this->processImagesDownload($items, $context);
diff --git a/ImportExport/Reader/ProductPriceReader.php b/ImportExport/Reader/ProductPriceReader.php
index 7d71d917..14038ac4 100644
--- a/ImportExport/Reader/ProductPriceReader.php
+++ b/ImportExport/Reader/ProductPriceReader.php
@@ -2,18 +2,18 @@
namespace Oro\Bundle\AkeneoBundle\ImportExport\Reader;
+use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\ImportExportBundle\Context\ContextInterface;
class ProductPriceReader extends IteratorBasedReader
{
+ use CacheProviderTrait;
+
protected function initializeFromContext(ContextInterface $context)
{
parent::initializeFromContext($context);
- $items = $this->stepExecution
- ->getJobExecution()
- ->getExecutionContext()
- ->get('items') ?? [];
+ $items = $this->cacheProvider->fetch('akeneo')['items'] ?? [];
$prices = [];
diff --git a/ImportExport/Reader/ProductReader.php b/ImportExport/Reader/ProductReader.php
index 4605d2a9..3c4ce8ac 100644
--- a/ImportExport/Reader/ProductReader.php
+++ b/ImportExport/Reader/ProductReader.php
@@ -3,10 +3,13 @@
namespace Oro\Bundle\AkeneoBundle\ImportExport\Reader;
use Oro\Bundle\AkeneoBundle\Integration\AkeneoFileManager;
+use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\ImportExportBundle\Context\ContextInterface;
class ProductReader extends IteratorBasedReader
{
+ use CacheProviderTrait;
+
/** @var AkeneoFileManager */
private $akeneoFileManager;
@@ -19,10 +22,7 @@ protected function initializeFromContext(ContextInterface $context)
{
parent::initializeFromContext($context);
- $items = $this->stepExecution
- ->getJobExecution()
- ->getExecutionContext()
- ->get('items') ?? [];
+ $items = $this->cacheProvider->fetch('akeneo')['items'] ?? [];
if (!empty($items)) {
$this->processFileTypeDownload($items, $context);
diff --git a/ImportExport/Reader/ProductVariantReader.php b/ImportExport/Reader/ProductVariantReader.php
index 4ea8f36f..b28d4379 100644
--- a/ImportExport/Reader/ProductVariantReader.php
+++ b/ImportExport/Reader/ProductVariantReader.php
@@ -2,18 +2,18 @@
namespace Oro\Bundle\AkeneoBundle\ImportExport\Reader;
+use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\ImportExportBundle\Context\ContextInterface;
class ProductVariantReader extends IteratorBasedReader
{
+ use CacheProviderTrait;
+
protected function initializeFromContext(ContextInterface $context)
{
parent::initializeFromContext($context);
- $variants = $this->stepExecution
- ->getJobExecution()
- ->getExecutionContext()
- ->get('variants') ?? [];
+ $variants = $this->cacheProvider->fetch('akeneo')['variants'] ?? [];
$this->stepExecution->setReadCount(count($variants));
diff --git a/ImportExport/Writer/AsyncWriter.php b/ImportExport/Writer/AsyncWriter.php
index 417d9047..86cdd171 100644
--- a/ImportExport/Writer/AsyncWriter.php
+++ b/ImportExport/Writer/AsyncWriter.php
@@ -5,26 +5,27 @@
use Akeneo\Bundle\BatchBundle\Entity\StepExecution;
use Akeneo\Bundle\BatchBundle\Item\ItemWriterInterface;
use Akeneo\Bundle\BatchBundle\Step\StepExecutionAwareInterface;
-use Doctrine\Common\Cache\CacheProvider;
+use Doctrine\DBAL\Platforms\MySqlPlatform;
+use Doctrine\DBAL\Types\Types;
use Oro\Bundle\AkeneoBundle\Async\Topics;
+use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\BatchBundle\Item\Support\ClosableInterface;
+use Oro\Bundle\EntityBundle\ORM\DoctrineHelper;
+use Oro\Bundle\IntegrationBundle\Entity\FieldsChanges;
use Oro\Bundle\MessageQueueBundle\Client\BufferedMessageProducer;
+use Oro\Bundle\MessageQueueBundle\Entity\Job;
use Oro\Component\MessageQueue\Client\Message;
use Oro\Component\MessageQueue\Client\MessagePriority;
use Oro\Component\MessageQueue\Client\MessageProducerInterface;
-use Oro\Component\MessageQueue\Job\Job;
-use Oro\Component\MessageQueue\Job\JobProcessor;
-use Oro\Component\MessageQueue\Job\JobRunner;
class AsyncWriter implements
ItemWriterInterface,
ClosableInterface,
StepExecutionAwareInterface
{
- private const VARIANTS_BATCH_SIZE = 25;
+ use CacheProviderTrait;
- /** @var JobRunner */
- private $jobRunner;
+ private const VARIANTS_BATCH_SIZE = 25;
/** @var MessageProducerInterface * */
private $messageProducer;
@@ -32,31 +33,22 @@ class AsyncWriter implements
/** @var StepExecution */
private $stepExecution;
- /** @var int */
- private $key = 0;
-
/** @var int */
private $size = 0;
- /** @var CacheProvider */
- private $cacheProvider;
-
- /** @var JobProcessor */
- private $jobProcessor;
+ /** @var DoctrineHelper */
+ private $doctrineHelper;
public function __construct(
- JobRunner $jobRunner,
MessageProducerInterface $messageProducer,
- JobProcessor $jobProcessor
+ DoctrineHelper $doctrineHelper
) {
- $this->jobRunner = $jobRunner;
$this->messageProducer = $messageProducer;
- $this->jobProcessor = $jobProcessor;
+ $this->doctrineHelper = $doctrineHelper;
}
public function initialize()
{
- $this->key = 1;
$this->size = 0;
}
@@ -72,57 +64,15 @@ public function write(array $items)
$newSize
);
$this->size = $newSize;
-
$this->stepExecution->setWriteCount($this->size);
- $setRootJob = \Closure::bind(
- function ($property, $value) {
- $this->{$property} = $value;
- },
- $this->jobRunner,
- $this->jobRunner
- );
-
- try {
- $setRootJob('rootJob', $this->getRootJob());
-
- $this->jobRunner->createDelayed(
- $jobName,
- function (JobRunner $jobRunner, Job $child) use ($items, $channelId) {
- $this->messageProducer->send(
- Topics::IMPORT_PRODUCTS,
- new Message(
- [
- 'integrationId' => $channelId,
- 'jobId' => $child->getId(),
- 'connector' => 'product',
- 'connector_parameters' => [
- 'items' => $items,
- 'incremented_read' => true,
- ],
- ],
- MessagePriority::HIGH
- )
- );
-
- if ($this->messageProducer instanceof BufferedMessageProducer
- && $this->messageProducer->isBufferingEnabled()) {
- $this->messageProducer->flushBuffer();
- }
-
- return true;
- }
- );
- } finally {
- $setRootJob('rootJob', null);
-
- $this->key++;
- }
+ $jobId = $this->insertJob($jobName);
+ $this->createFieldsChanges($jobId, $items, 'items');
+ $this->sendMessage($channelId, $jobId, true);
}
public function flush()
{
- $this->key = 1;
$this->size = 0;
$variants = $this->cacheProvider->fetch('product_variants') ?? [];
@@ -132,70 +82,72 @@ public function flush()
$channelId = $this->stepExecution->getJobExecution()->getExecutionContext()->get('channel');
- $setRootJob = \Closure::bind(
- function ($property, $value) {
- $this->{$property} = $value;
- },
- $this->jobRunner,
- $this->jobRunner
+ $chunks = array_chunk($variants, self::VARIANTS_BATCH_SIZE, true);
+
+ foreach ($chunks as $key => $chunk) {
+ $jobName = sprintf(
+ 'oro_integration:sync_integration:%s:variants:%s-%s',
+ $channelId,
+ self::VARIANTS_BATCH_SIZE * $key + 1,
+ self::VARIANTS_BATCH_SIZE * $key + count($chunk)
+ );
+
+ $jobId = $this->insertJob($jobName);
+ $this->createFieldsChanges($jobId, $chunk, 'variants');
+ $this->sendMessage($channelId, $jobId);
+ }
+ }
+
+ private function createFieldsChanges(int $jobId, array &$data, string $key): void
+ {
+ $em = $this->doctrineHelper->getEntityManager(FieldsChanges::class);
+ $fieldsChanges = $em
+ ->getRepository(FieldsChanges::class)
+ ->findOneBy(['entityId' => $jobId, 'entityClass' => Job::class]);
+ if (!$fieldsChanges) {
+ $fieldsChanges = new FieldsChanges([]);
+ $fieldsChanges->setEntityClass(Job::class);
+ $fieldsChanges->setEntityId($jobId);
+ }
+ $fieldsChanges->setChangedFields([$key => $data]);
+ $em->persist($fieldsChanges);
+ $em->flush($fieldsChanges);
+ $em->clear(FieldsChanges::class);
+ }
+
+ private function sendMessage(int $channelId, int $jobId, bool $incrementedRead = false): void
+ {
+ $this->messageProducer->send(
+ Topics::IMPORT_PRODUCTS,
+ new Message(
+ [
+ 'integrationId' => $channelId,
+ 'jobId' => $jobId,
+ 'connector' => 'product',
+ 'connector_parameters' => ['incremented_read' => $incrementedRead],
+ ],
+ MessagePriority::HIGH
+ )
);
- try {
- $setRootJob('rootJob', $this->getRootJob());
- $chunks = array_chunk($variants, self::VARIANTS_BATCH_SIZE, true);
-
- foreach ($chunks as $key => $chunk) {
- $jobName = sprintf(
- 'oro_integration:sync_integration:%s:variants:%s-%s',
- $channelId,
- self::VARIANTS_BATCH_SIZE * $key + 1,
- self::VARIANTS_BATCH_SIZE * $key + count($chunk)
- );
- $this->jobRunner->createDelayed(
- $jobName,
- function (JobRunner $jobRunner, Job $child) use ($channelId, $chunk) {
- $this->messageProducer->send(
- Topics::IMPORT_PRODUCTS,
- new Message(
- [
- 'integrationId' => $channelId,
- 'jobId' => $child->getId(),
- 'connector' => 'product',
- 'connector_parameters' => [
- 'variants' => $chunk,
- ],
- ],
- MessagePriority::HIGH
- )
- );
-
- return true;
- }
- );
- }
- } finally {
- $setRootJob('rootJob', null);
+ if ($this->messageProducer instanceof BufferedMessageProducer
+ && $this->messageProducer->isBufferingEnabled()) {
+ $this->messageProducer->flushBuffer();
}
}
- private function getRootJob(): Job
+ private function getRootJob(): ?int
{
$rootJobId = $this->stepExecution->getJobExecution()->getExecutionContext()->get('rootJobId') ?? null;
if (!$rootJobId) {
throw new \InvalidArgumentException('Root job id is empty');
}
- $rootJob = $this->jobProcessor->findJobById($rootJobId);
- if (!$rootJob) {
- throw new \InvalidArgumentException('Root job is empty');
- }
-
- return $rootJob;
+ return $rootJobId;
}
public function close()
{
- $this->key = 1;
$this->size = 0;
}
@@ -204,8 +156,46 @@ public function setStepExecution(StepExecution $stepExecution)
$this->stepExecution = $stepExecution;
}
- public function setCacheProvider(CacheProvider $cacheProvider): void
+ private function insertJob(string $jobName): ?int
{
- $this->cacheProvider = $cacheProvider;
+ $em = $this->doctrineHelper->getEntityManager(Job::class);
+ $tableName = $em->getClassMetadata(Job::class)->getTableName();
+ $connection = $em->getConnection();
+
+ $qb = $connection->createQueryBuilder();
+ $qb
+ ->insert($tableName)
+ ->values([
+ 'name' => ':name',
+ 'status' => ':status',
+ 'interrupted' => ':interrupted',
+ 'created_at' => ':createdAt',
+ 'root_job_id' => ':rootJob',
+ ])
+ ->setParameters([
+ 'name' => $jobName,
+ 'status' => Job::STATUS_NEW,
+ 'interrupted' => false,
+ 'unique' => false,
+ 'createdAt' => new \DateTime(),
+ 'rootJob' => $this->getRootJob(),
+ ], [
+ 'name' => Types::STRING,
+ 'status' => Types::STRING,
+ 'interrupted' => Types::BOOLEAN,
+ 'unique' => Types::BOOLEAN,
+ 'createdAt' => Types::DATETIME_MUTABLE,
+ 'rootJob' => Types::INTEGER,
+ ]);
+
+ if ($connection->getDatabasePlatform() instanceof MySqlPlatform) {
+ $qb->setValue('`unique`', ':unique');
+ } else {
+ $qb->setValue('"unique"', ':unique');
+ }
+
+ $qb->execute();
+
+ return $connection->lastInsertId();
}
}
diff --git a/Integration/Connector/ProductConnector.php b/Integration/Connector/ProductConnector.php
index 69818fdf..2d8d8720 100644
--- a/Integration/Connector/ProductConnector.php
+++ b/Integration/Connector/ProductConnector.php
@@ -3,6 +3,7 @@
namespace Oro\Bundle\AkeneoBundle\Integration\Connector;
use Oro\Bundle\AkeneoBundle\Placeholder\SchemaUpdateFilter;
+use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\IntegrationBundle\Entity\Channel;
use Oro\Bundle\IntegrationBundle\Provider\AbstractConnector;
use Oro\Bundle\IntegrationBundle\Provider\AllowedConnectorInterface;
@@ -14,6 +15,8 @@
*/
class ProductConnector extends AbstractConnector implements ConnectorInterface, AllowedConnectorInterface
{
+ use CacheProviderTrait;
+
const IMPORT_JOB_NAME = 'akeneo_product_import';
const PAGE_SIZE = 100;
const TYPE = 'product';
@@ -73,20 +76,12 @@ public function setSchemaUpdateFilter(SchemaUpdateFilter $schemaUpdateFilter): v
*/
protected function getConnectorSource()
{
- $items = $this->stepExecution
- ->getJobExecution()
- ->getExecutionContext()
- ->get('items');
-
+ $items = $this->cacheProvider->fetch('akeneo')['items'] ?? [];
if ($items) {
return new \ArrayIterator();
}
- $variants = $this->stepExecution
- ->getJobExecution()
- ->getExecutionContext()
- ->get('variants');
-
+ $variants = $this->cacheProvider->fetch('akeneo')['variants'] ?? [];
if ($variants) {
return new \ArrayIterator();
}
diff --git a/Resources/config/commands.yml b/Resources/config/commands.yml
new file mode 100644
index 00000000..60d55f6d
--- /dev/null
+++ b/Resources/config/commands.yml
@@ -0,0 +1,9 @@
+services:
+ _defaults:
+ public: false
+
+ Oro\Bundle\AkeneoBundle\Command\CleanupCommand:
+ arguments:
+ - '@oro_entity.doctrine_helper'
+ tags:
+ - { name: console.command }
diff --git a/Resources/config/importexport.yml b/Resources/config/importexport.yml
index 83ae9f09..2243189c 100644
--- a/Resources/config/importexport.yml
+++ b/Resources/config/importexport.yml
@@ -210,6 +210,7 @@ services:
- '@oro_integration.provider.connector_context_mediator'
calls:
- [ setSchemaUpdateFilter, [ '@oro_akeneo.placeholder.schema_update_filter' ] ]
+ - [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
tags:
- { name: oro_integration.connector, type: product, channel_type: oro_akeneo }
@@ -312,9 +313,8 @@ services:
oro_akeneo.importexport.writer.async_product:
class: 'Oro\Bundle\AkeneoBundle\ImportExport\Writer\AsyncWriter'
arguments:
- - '@oro_message_queue.job.runner'
- '@oro_message_queue.message_producer'
- - '@oro_message_queue.job.processor'
+ - '@oro_entity.doctrine_helper'
calls:
- [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
@@ -336,11 +336,14 @@ services:
- '@oro_importexport.context_registry'
calls:
- [ setAkeneoFileManager, [ '@oro_akeneo.integration.akeneo_file_manager' ] ]
+ - [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
oro_akeneo.importexport.reader.price:
class: 'Oro\Bundle\AkeneoBundle\ImportExport\Reader\ProductPriceReader'
arguments:
- '@oro_importexport.context_registry'
+ calls:
+ - [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
oro_akeneo.importexport.reader.product_image:
class: 'Oro\Bundle\AkeneoBundle\ImportExport\Reader\ProductImageReader'
@@ -349,11 +352,14 @@ services:
calls:
- [ setDoctrineHelper, [ '@oro_entity.doctrine_helper' ] ]
- [ setAkeneoFileManager, [ '@oro_akeneo.integration.akeneo_file_manager' ] ]
+ - [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
oro_akeneo.importexport.reader.product_variant:
class: 'Oro\Bundle\AkeneoBundle\ImportExport\Reader\ProductVariantReader'
arguments:
- '@oro_importexport.context_registry'
+ calls:
+ - [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
oro_akeneo.importexport.reader.category_parent:
class: 'Oro\Bundle\AkeneoBundle\ImportExport\Reader\CategoryParentReader'
diff --git a/Resources/config/services.yml b/Resources/config/services.yml
index 7b6ff9bc..e8346f13 100644
--- a/Resources/config/services.yml
+++ b/Resources/config/services.yml
@@ -123,6 +123,8 @@ services:
- '@security.token_storage'
- '@logger'
- '@oro_integration.processor_registry'
+ calls:
+ - [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
tags:
- { name: 'oro_message_queue.client.message_processor', topicName: 'oro.integration.akeneo.product' }
diff --git a/Tools/CacheProviderTrait.php b/Tools/CacheProviderTrait.php
new file mode 100644
index 00000000..6b1d51b8
--- /dev/null
+++ b/Tools/CacheProviderTrait.php
@@ -0,0 +1,16 @@
+cacheProvider = $cacheProvider;
+ }
+}