From f31459c20564d009fa49ce576ca1c79849c17288 Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Tue, 7 May 2024 12:50:12 +0300 Subject: [PATCH 01/12] Added "mateusjunges/laravel-trackable-jobs" package, updated larastan package name. Makefile added option to run larastan seperately. gitignore added "/build/". --- .env.example | 2 + .gitignore | 3 +- Makefile | 12 ++++-- composer.json | 3 +- docker-compose.yml | 3 +- docker/app/Dockerfile | 3 +- phpstan.neon.dist | 2 +- tests/IntegrationTestCase.php | 1 + tests/TestCase.php | 9 ++++- ...el_trackable_create_tracked_jobs_table.php | 39 +++++++++++++++++++ 10 files changed, 65 insertions(+), 12 deletions(-) create mode 100644 tests/laravel/database/migrations/2019_15_02_000002_laravel_trackable_create_tracked_jobs_table.php diff --git a/.env.example b/.env.example index 43b2d8d0..4fa16080 100644 --- a/.env.example +++ b/.env.example @@ -2,6 +2,8 @@ APP_ENV=development XDEBUG_CONFIG=remote_port=9001 remote_host=172.17.0.1 remote_autostart=1 remote_log=/home/user/xdebug.log PHP_IDE_CONFIG=serverName=Scout ELASTICSEARCH_HOST=elasticsearch:9200 +ELASTICSEARCH_USER=elasticsearch +ELASTICSEARCH_PASSWORD=password DB_HOST=db DB_PORT=3306 DB_DATABASE=my_database diff --git a/.gitignore b/.gitignore index aa4fb319..c5ae7a65 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ composer.lock .env .phpunit.result.cache -/vendor/ \ No newline at end of file +/vendor/ +/build/ \ No newline at end of file diff --git a/Makefile b/Makefile index 3bca6dd4..131c6fe8 100644 --- a/Makefile +++ b/Makefile @@ -27,24 +27,28 @@ up: ## Start all containers (in background) for development $(docker_compose_bin) up -d down: ## Stop all started for development containers - $(docker_compose_bin) down + $(docker_compose_bin) down -v restart: up ## Restart all started for development containers $(docker_compose_bin) restart +rebuild: down + $(docker_compose_bin) up --build -d + shell: up ## Start shell into application container $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" /bin/sh install: up ## Install application dependencies into application container $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" composer install --no-interaction --ansi -test: up ## Execute application tests +larastan: up $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" ./vendor/bin/phpstan analyze --memory-limit=4000M + +test: up ## Execute application tests $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" ./vendor/bin/phpunit --testdox --stop-on-failure test-coverage: up ## Execute application tests and generate report - $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" ./vendor/bin/phpstan analyze $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" ./vendor/bin/phpunit --coverage-html build/coverage-report test-filter: - $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" ./vendor/bin/phpunit --filter=$(filter) --testdox + $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" ./vendor/bin/phpunit --filter=$(filter) --testdox \ No newline at end of file diff --git a/composer.json b/composer.json index ed44db61..96bc49a8 100644 --- a/composer.json +++ b/composer.json @@ -20,11 +20,12 @@ "elasticsearch/elasticsearch": "^8.0", "handcraftedinthealps/elasticsearch-dsl": "^8.0", "laravel/scout": "^8.0|^9.0|^10.0", + "mateusjunges/laravel-trackable-jobs": "^1.6", "roave/better-reflection": "^4.3|^5.0|^6.18" }, "require-dev": { + "larastan/larastan": "^2.9", "laravel/legacy-factories": "^1.0", - "nunomaduro/larastan": "^2.4", "orchestra/testbench": "^6.17|^7.0|^8.0", "php-http/guzzle7-adapter": "^1.0", "phpunit/phpunit": "^9.4.0" diff --git a/docker-compose.yml b/docker-compose.yml index a843ddf3..f39271cc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3.4' - services: app: build: @@ -50,6 +48,7 @@ services: - bootstrap.memory_lock=true - action.destructive_requires_name=false - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + - xpack.security.enabled=false ulimits: memlock: soft: -1 diff --git a/docker/app/Dockerfile b/docker/app/Dockerfile index 0c5577c3..2a3ae386 100644 --- a/docker/app/Dockerfile +++ b/docker/app/Dockerfile @@ -41,9 +41,10 @@ RUN echo "xdebug.remote_connect_back=0" >> /usr/local/etc/php/conf.d/docker-php- RUN echo "xdebug.idekey=\"PHPSTORM\"" >> /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini RUN echo "xdebug.remote_port=9000" >> /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini RUN echo "xdebug.remote_autostart=1" >> /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini +RUN echo "xdebug.mode=debug,coverage" >> /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini RUN apt-get update && apt-get install -y procps -RUN apt-get update && apt-get install -y netcat +RUN apt-get update && apt-get install -y netcat-traditional RUN apt-get update && apt-get install -y net-tools FROM php as php-testing diff --git a/phpstan.neon.dist b/phpstan.neon.dist index 8143f7a6..5345d5fb 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -1,5 +1,5 @@ includes: - - vendor/nunomaduro/larastan/extension.neon + - vendor/larastan/larastan/extension.neon parameters: level: max paths: diff --git a/tests/IntegrationTestCase.php b/tests/IntegrationTestCase.php index d8e78e21..1cd5d7bc 100644 --- a/tests/IntegrationTestCase.php +++ b/tests/IntegrationTestCase.php @@ -32,6 +32,7 @@ protected function getEnvironmentSetUp($app) parent::getEnvironmentSetUp($app); $app['config']->set('elasticsearch', require(__DIR__.'/../config/elasticsearch.php')); + $app['config']->set('trackable-job', require(__DIR__.'/../vendor/mateusjunges/laravel-trackable-jobs/config/trackable-jobs.php')); $app['config']->set('elasticsearch.indices.mappings.products', [ 'properties' => [ 'type' => [ diff --git a/tests/TestCase.php b/tests/TestCase.php index d0fb948f..6e695fe8 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -4,11 +4,11 @@ namespace Tests; -use Illuminate\Support\Facades\Artisan; use Laravel\Scout\ScoutServiceProvider; use Matchish\ScoutElasticSearch\ElasticSearchServiceProvider; use Matchish\ScoutElasticSearch\Engines\ElasticSearchEngine; use Matchish\ScoutElasticSearch\ScoutElasticSearchServiceProvider; +use Junges\TrackableJobs\Providers\TrackableJobsServiceProvider; use Orchestra\Testbench\TestCase as BaseTestCase; abstract class TestCase extends BaseTestCase @@ -21,7 +21,7 @@ public function setUp(): void $this->withFactories(database_path('factories')); - Artisan::call('migrate:fresh', ['--database' => 'mysql']); + \Artisan::call('migrate:fresh', ['--database' => 'mysql']); } /** @@ -34,9 +34,13 @@ protected function getEnvironmentSetUp($app) { $app['config']->set('scout.driver', ElasticSearchEngine::class); $app['config']->set('scout.chunk.searchable', 3); + $app['config']->set('scout.chunk.handlers', 1); $app['config']->set('scout.queue', false); // Setup default database to use sqlite :memory: $app['config']->set('database.default', 'mysql'); + $app['config']->set('elasticsearch.host', 'elasticsearch:9200'); + $app['config']->set('elasticsearch.user', 'elasticsearch'); + $app['config']->set('elasticsearch.password', 'password'); } protected function getPackageProviders($app) @@ -45,6 +49,7 @@ protected function getPackageProviders($app) ScoutServiceProvider::class, ScoutElasticSearchServiceProvider::class, ElasticSearchServiceProvider::class, + TrackableJobsServiceProvider::class, ]; } } diff --git a/tests/laravel/database/migrations/2019_15_02_000002_laravel_trackable_create_tracked_jobs_table.php b/tests/laravel/database/migrations/2019_15_02_000002_laravel_trackable_create_tracked_jobs_table.php new file mode 100644 index 00000000..b9c6e078 --- /dev/null +++ b/tests/laravel/database/migrations/2019_15_02_000002_laravel_trackable_create_tracked_jobs_table.php @@ -0,0 +1,39 @@ +table_name = config('trackable-jobs.tables.tracked_jobs', 'tracked_jobs'); + $this->usingUuid = config('trackable-jobs.using_uuid', false); + } + + public function up(): void + { + Schema::create($this->table_name, function (Blueprint $table) { + $this->usingUuid + ? $table->uuid('uuid')->primary() + : $table->id(); + $table->string('trackable_id')->index(); + $table->string('trackable_type')->index(); + $table->string('name'); + $table->string('status')->default('queued'); + $table->longText('output')->nullable(); + $table->timestamp('started_at')->nullable(); + $table->timestamp('finished_at')->nullable(); + $table->timestamps(); + }); + } + + public function down(): void + { + Schema::dropIfExists($this->table_name); + } +}; From 78ee101fd0399a7af4cc3919b2f2b79a19e46c99 Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Tue, 7 May 2024 12:51:47 +0300 Subject: [PATCH 02/12] Removed ChunkScope, added FromScope. --- src/Database/Scopes/ChunkScope.php | 56 ------------------------------ src/Database/Scopes/FromScope.php | 44 +++++++++++++++++++++++ src/Database/Scopes/PageScope.php | 5 +-- 3 files changed, 47 insertions(+), 58 deletions(-) delete mode 100644 src/Database/Scopes/ChunkScope.php create mode 100644 src/Database/Scopes/FromScope.php diff --git a/src/Database/Scopes/ChunkScope.php b/src/Database/Scopes/ChunkScope.php deleted file mode 100644 index 049c4917..00000000 --- a/src/Database/Scopes/ChunkScope.php +++ /dev/null @@ -1,56 +0,0 @@ -start = $start; - $this->end = $end; - } - - /** - * Apply the scope to a given Eloquent query builder. - * - * @param \Illuminate\Database\Eloquent\Builder $builder - * @param \Illuminate\Database\Eloquent\Model $model - * @return void - */ - public function apply(Builder $builder, Model $model) - { - $start = $this->start; - $end = $this->end; - $builder - ->when(! is_null($start), function ($query) use ($start, $model) { - return $query->where($model->getKeyName(), '>', $start); - }) - ->when(! is_null($end), function ($query) use ($end, $model) { - return $query->where($model->getKeyName(), '<=', $end); - }); - } - - public function key(): string - { - return static::class; - } -} diff --git a/src/Database/Scopes/FromScope.php b/src/Database/Scopes/FromScope.php new file mode 100644 index 00000000..70bff430 --- /dev/null +++ b/src/Database/Scopes/FromScope.php @@ -0,0 +1,44 @@ +lastId = $lastId; + $this->perPage = $perPage; + } + + /** + * Apply the scope to a given Eloquent query builder. + * + * @param Builder $builder + * @param Model $model + * @return void + */ + public function apply(Builder $builder, Model $model) + { + $builder->forPageAfterId($this->perPage, $this->lastId); + } +} diff --git a/src/Database/Scopes/PageScope.php b/src/Database/Scopes/PageScope.php index 6522f0d7..50b8414f 100644 --- a/src/Database/Scopes/PageScope.php +++ b/src/Database/Scopes/PageScope.php @@ -5,6 +5,7 @@ use Illuminate\Database\Eloquent\Builder; use Illuminate\Database\Eloquent\Model; use Illuminate\Database\Eloquent\Scope; +use Laravel\Scout\Searchable; class PageScope implements Scope { @@ -32,8 +33,8 @@ public function __construct(int $page, int $perPage) /** * Apply the scope to a given Eloquent query builder. * - * @param \Illuminate\Database\Eloquent\Builder $builder - * @param \Illuminate\Database\Eloquent\Model $model + * @param Builder $builder + * @param Model $model * @return void */ public function apply(Builder $builder, Model $model) From f27c1f0a3dfa6f6e6e5f1fed22f841adf259d182 Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Tue, 7 May 2024 13:09:19 +0300 Subject: [PATCH 03/12] Updated ImportCommand to include "--parallel" option. Updated StageInterface, updated existing stages to include the new interface methods. Changed PullFromSource to instead of being multiple instance, be one and iterable. Added PullFromSourceParallel to handle parallel import. --- src/Console/Commands/ImportCommand.php | 41 ++++- src/Jobs/Import.php | 33 +++- src/Jobs/ImportStages.php | 21 ++- src/Jobs/ProcessSearchable.php | 53 ++++++ src/Jobs/QueueableJob.php | 3 +- src/Jobs/Stages/CleanUp.php | 15 +- src/Jobs/Stages/CreateWriteIndex.php | 10 ++ src/Jobs/Stages/PullFromSource.php | 36 +++- src/Jobs/Stages/PullFromSourceParallel.php | 158 ++++++++++++++++++ src/Jobs/Stages/RefreshIndex.php | 10 ++ src/Jobs/Stages/StageInterface.php | 20 +++ .../Stages/SwitchToNewAndRemoveOldIndex.php | 33 +++- src/Searchable/DefaultImportSource.php | 112 +++++++++++-- src/Searchable/ImportSource.php | 37 +++- 14 files changed, 535 insertions(+), 47 deletions(-) create mode 100644 src/Jobs/ProcessSearchable.php create mode 100644 src/Jobs/Stages/PullFromSourceParallel.php diff --git a/src/Console/Commands/ImportCommand.php b/src/Console/Commands/ImportCommand.php index fbdf0675..bc504689 100644 --- a/src/Console/Commands/ImportCommand.php +++ b/src/Console/Commands/ImportCommand.php @@ -9,7 +9,6 @@ use Matchish\ScoutElasticSearch\ElasticSearch\Config\Config; use Matchish\ScoutElasticSearch\Jobs\Import; use Matchish\ScoutElasticSearch\Jobs\QueueableJob; -use Matchish\ScoutElasticSearch\Searchable\ImportSource; use Matchish\ScoutElasticSearch\Searchable\ImportSourceFactory; use Matchish\ScoutElasticSearch\Searchable\SearchableListFactory; @@ -18,7 +17,8 @@ final class ImportCommand extends Command /** * {@inheritdoc} */ - protected $signature = 'scout:import {searchable?* : The name of the searchable}'; + protected $signature = 'scout:import {searchable?* : The name of the searchable} {--P|parallel : Index items in parallel}'; + /** * {@inheritdoc} */ @@ -29,12 +29,23 @@ final class ImportCommand extends Command */ public function handle(): void { + $parallel = false; + + if($this->option('parallel')) { + $parallel = true; + } + $this->searchableList((array) $this->argument('searchable')) - ->each(function ($searchable) { - $this->import($searchable); + ->each(function (string $searchable) use ($parallel) { + $this->import($searchable, $parallel); }); } + /** + * @param array $argument + * + * @return Collection + */ private function searchableList(array $argument): Collection { return collect($argument)->whenEmpty(function () { @@ -44,18 +55,32 @@ private function searchableList(array $argument): Collection }); } - private function import(string $searchable): void + /** + * @param string $searchable + * @param bool $parallel + * + * @return void + */ + private function import(string $searchable, bool $parallel): void { $sourceFactory = app(ImportSourceFactory::class); $source = $sourceFactory::from($searchable); $job = new Import($source); - $job->timeout = Config::queueTimeout(); + /** @var int|null $queueTimeout */ + $queueTimeout = Config::queueTimeout(); + if($queueTimeout !== null) { + $job->timeout = (int) $queueTimeout; + } + $job->parallel = $parallel; if (config('scout.queue')) { $job = (new QueueableJob())->chain([$job]); - $job->timeout = Config::queueTimeout(); + /** @var int|null $queueTimeout */ + $queueTimeout = Config::queueTimeout(); + if($queueTimeout !== null) { + $job->timeout = (int) $queueTimeout; + } } - $bar = (new ProgressBarFactory($this->output))->create(); $job->withProgressReport($bar); diff --git a/src/Jobs/Import.php b/src/Jobs/Import.php index 026e5737..c176d11b 100644 --- a/src/Jobs/Import.php +++ b/src/Jobs/Import.php @@ -21,6 +21,10 @@ final class Import * @var ImportSource */ private $source; + /** + * @var boolean + */ + public $parallel = false; public ?int $timeout = null; @@ -40,16 +44,31 @@ public function handle(Client $elasticsearch): void $stages = $this->stages(); $estimate = $stages->sum->estimate(); $this->progressBar()->setMaxSteps($estimate); - $stages->each(function ($stage) use ($elasticsearch) { - /** @var StageInterface $stage */ - $this->progressBar()->setMessage($stage->title()); - $stage->handle($elasticsearch); - $this->progressBar()->advance($stage->estimate()); - }); + + /** @var StageInterface $currentStage */ + $currentStage = $stages->shift(); + + while ($currentStage !== null) { + $this->progressBar()->setMessage($currentStage->title()); + $currentStage->handle($elasticsearch); + $this->progressBar()->advance($currentStage->advance()); + if($currentStage->completed()) { + if($stages->isEmpty()) { + /** @var null $currentStage */ + $currentStage = null; + } else { + /** @var StageInterface $currentStage */ + $currentStage = $stages->shift(); + } + } + } } + /** + * @return Collection + */ private function stages(): Collection { - return ImportStages::fromSource($this->source); + return ImportStages::fromSource($this->source, $this->parallel); } } diff --git a/src/Jobs/ImportStages.php b/src/Jobs/ImportStages.php index 78fe5632..12a7b35a 100644 --- a/src/Jobs/ImportStages.php +++ b/src/Jobs/ImportStages.php @@ -3,24 +3,41 @@ namespace Matchish\ScoutElasticSearch\Jobs; use Illuminate\Support\Collection; +use Matchish\ScoutElasticSearch\Jobs\Stages\StageInterface; use Matchish\ScoutElasticSearch\ElasticSearch\Index; use Matchish\ScoutElasticSearch\Jobs\Stages\CleanUp; use Matchish\ScoutElasticSearch\Jobs\Stages\CreateWriteIndex; use Matchish\ScoutElasticSearch\Jobs\Stages\PullFromSource; +use Matchish\ScoutElasticSearch\Jobs\Stages\PullFromSourceParallel; use Matchish\ScoutElasticSearch\Jobs\Stages\RefreshIndex; use Matchish\ScoutElasticSearch\Jobs\Stages\SwitchToNewAndRemoveOldIndex; use Matchish\ScoutElasticSearch\Searchable\ImportSource; +/** + * @extends Collection + */ class ImportStages extends Collection { /** * @param ImportSource $source - * @return Collection + * @param bool $parallel + * + * @return Collection */ - public static function fromSource(ImportSource $source) + public static function fromSource(ImportSource $source, bool $parallel = false) { $index = Index::fromSource($source); + if($parallel) { + return (new self([ + new CleanUp($source), + new CreateWriteIndex($source, $index), + PullFromSourceParallel::chunked($source), + new RefreshIndex($index), + new SwitchToNewAndRemoveOldIndex($source, $index), + ]))->flatten()->filter(); + } + return (new self([ new CleanUp($source), new CreateWriteIndex($source, $index), diff --git a/src/Jobs/ProcessSearchable.php b/src/Jobs/ProcessSearchable.php new file mode 100644 index 00000000..adc66c3c --- /dev/null +++ b/src/Jobs/ProcessSearchable.php @@ -0,0 +1,53 @@ + + */ + private $data; + + /** + * @param Collection $data + */ + public function __construct(Collection $data) + { + $this->__baseConstruct($data->first()); + $this->data = $data; + } + + /** + * Handles the job execution. + * + * @return void + */ + public function handle(): void + { + /** @var Model|Searchable $model */ + $model = $this->data->first(); + + /** @var \Laravel\Scout\Engines\Engine $engine */ + $engine = $model->searchableUsing(); + + $engine->update($this->data); // @phpstan-ignore-line + } +} \ No newline at end of file diff --git a/src/Jobs/QueueableJob.php b/src/Jobs/QueueableJob.php index d799f7de..d000b61e 100644 --- a/src/Jobs/QueueableJob.php +++ b/src/Jobs/QueueableJob.php @@ -8,8 +8,7 @@ class QueueableJob implements ShouldQueue { - use Queueable; - use ProgressReportable; + use Queueable, ProgressReportable; public ?int $timeout = null; diff --git a/src/Jobs/Stages/CleanUp.php b/src/Jobs/Stages/CleanUp.php index da674c9d..7258461f 100644 --- a/src/Jobs/Stages/CleanUp.php +++ b/src/Jobs/Stages/CleanUp.php @@ -3,6 +3,7 @@ namespace Matchish\ScoutElasticSearch\Jobs\Stages; use Elastic\Elasticsearch\Client; +use Elastic\Elasticsearch\Response\Elasticsearch; use Elastic\Elasticsearch\Exception\ClientResponseException; use Matchish\ScoutElasticSearch\ElasticSearch\Params\Indices\Alias\Get as GetAliasParams; use Matchish\ScoutElasticSearch\ElasticSearch\Params\Indices\Delete as DeleteIndexParams; @@ -31,7 +32,9 @@ public function handle(Client $elasticsearch): void $source = $this->source; $params = GetAliasParams::anyIndex($source->searchableAs()); try { - $response = $elasticsearch->indices()->getAlias($params->toArray())->asArray(); + /** @var Elasticsearch $elasticResponse */ + $elasticResponse = $elasticsearch->indices()->getAlias($params->toArray()); + $response = $elasticResponse->asArray(); } catch (ClientResponseException $e) { $response = []; } @@ -55,4 +58,14 @@ public function estimate(): int { return 1; } + + public function advance(): int + { + return 1; + } + + public function completed(): bool + { + return true; + } } diff --git a/src/Jobs/Stages/CreateWriteIndex.php b/src/Jobs/Stages/CreateWriteIndex.php index 1db3b48d..5ee111f9 100644 --- a/src/Jobs/Stages/CreateWriteIndex.php +++ b/src/Jobs/Stages/CreateWriteIndex.php @@ -61,4 +61,14 @@ public function estimate(): int { return 1; } + + public function advance(): int + { + return 1; + } + + public function completed(): bool + { + return true; + } } diff --git a/src/Jobs/Stages/PullFromSource.php b/src/Jobs/Stages/PullFromSource.php index 6e6a7c5e..b09ff351 100644 --- a/src/Jobs/Stages/PullFromSource.php +++ b/src/Jobs/Stages/PullFromSource.php @@ -3,7 +3,8 @@ namespace Matchish\ScoutElasticSearch\Jobs\Stages; use Elastic\Elasticsearch\Client; -use Illuminate\Support\Collection; +use Matchish\ScoutElasticSearch\Database\Scopes\FromScope; +use Matchish\ScoutElasticSearch\Database\Scopes\PageScope; use Matchish\ScoutElasticSearch\Searchable\ImportSource; /** @@ -16,6 +17,11 @@ final class PullFromSource implements StageInterface */ private $source; + /** + * @var int + */ + private $handledChunks = 0; + /** * @param ImportSource $source */ @@ -26,13 +32,24 @@ public function __construct(ImportSource $source) public function handle(Client $elasticsearch = null): void { + $this->handledChunks++; $results = $this->source->get()->filter->shouldBeSearchable(); if (! $results->isEmpty()) { $results->first()->searchableUsing()->update($results); + if ($results->first()->getKeyType() !== 'int') { + $this->source->setChunkScope(new PageScope($this->handledChunks, $this->source->getChunkSize())); + } else { + $this->source->setChunkScope(new FromScope($results->last()->getKey(), $this->source->getChunkSize())); + } } } public function estimate(): int + { + return $this->source->getTotalChunks() + 1; + } + + public function advance(): int { return 1; } @@ -42,14 +59,21 @@ public function title(): string return 'Indexing...'; } + public function completed(): bool + { + return ($this->handledChunks - 1) >= $this->source->getTotalChunks(); + } + /** * @param ImportSource $source - * @return Collection + * @return PullFromSource */ - public static function chunked(ImportSource $source): Collection + public static function chunked(ImportSource $source): ?PullFromSource { - return $source->chunked()->map(function ($chunk) { - return new static($chunk); - }); + $source = $source->chunked(); + if($source === null) { + return null; + } + return new static($source); } } diff --git a/src/Jobs/Stages/PullFromSourceParallel.php b/src/Jobs/Stages/PullFromSourceParallel.php new file mode 100644 index 00000000..427d27b1 --- /dev/null +++ b/src/Jobs/Stages/PullFromSourceParallel.php @@ -0,0 +1,158 @@ + + */ + private $queues = [ + 'import_1', + ]; + + /** + * @param ImportSource $source + */ + public function __construct(ImportSource $source) + { + $this->source = $source; + $this->queues = collect(config('scout.chunk.handlers', self::DEFAULT_HANDLER_COUNT))->map(function($i) { + return 'import_'.$i; + })->toArray(); + } + + /** + * @return string + */ + private function getNextQueue(): string + { + /** @var string $queue */ + $queue = array_shift($this->queues); + $this->queues[] = $queue; + return $queue; + } + + /** + * {@inheritdoc} + */ + public function handle(Client $elasticsearch = null): void + { + if(count($this->dispatchedJobIds) > 0) { + $jobs = TrackedJob::findMany($this->dispatchedJobIds); + $finishedJobs = $jobs->filter(function($job) { + return $job->status === TrackedJob::STATUS_FINISHED; + }); + $this->handledJobs = array_merge($this->handledJobs, $finishedJobs->pluck('id')->toArray()); + $this->advanceBy += $finishedJobs->count(); + $this->dispatchedJobIds = $jobs->filter(function($job) { + return $job->status !== TrackedJob::STATUS_FINISHED; + })->pluck('id')->toArray(); + } + + if(count($this->dispatchedJobIds) >= $this->source->getTotalChunks()) { + return; + } + + $results = $this->source->get()->filter->shouldBeSearchable(); + + if (! $results->isEmpty()) { + $job = new ProcessSearchable($results); + dispatch($job)->onQueue($this->getNextQueue())->onConnection($this->source->syncWithSearchUsing()); + $this->dispatchedJobIds[] = $job->trackedJob->getKey(); + if ($results->first()->getKeyType() !== 'int') { + $this->source->setChunkScope( + new PageScope( + count($this->handledJobs) + count($this->dispatchedJobIds), + $this->source->getChunkSize()) + ); + } else { + $this->source->setChunkScope(new FromScope($results->last()->getKey(), $this->source->getChunkSize())); + } + } + } + + /** + * {@inheritdoc} + */ + public function estimate(): int + { + return $this->source->getTotalChunks(); + } + + /** + * {@inheritdoc} + */ + public function advance(): int + { + $advance = $this->advanceBy; + $this->advanceBy = 0; + return $advance; + } + + /** + * {@inheritdoc} + */ + public function title(): string + { + return 'Indexing...'; + } + + /** + * {@inheritdoc} + */ + public function completed(): bool + { + return count($this->handledJobs) >= $this->source->getTotalChunks(); + } + + /** + * @param ImportSource $source + * @return PullFromSourceParallel|null + */ + public static function chunked(ImportSource $source): PullFromSourceParallel|null + { + $source = $source->chunked(); + + if($source === null) { + return null; + } + + return new static($source); + } +} diff --git a/src/Jobs/Stages/RefreshIndex.php b/src/Jobs/Stages/RefreshIndex.php index 6a9e7aae..5ea4eac8 100644 --- a/src/Jobs/Stages/RefreshIndex.php +++ b/src/Jobs/Stages/RefreshIndex.php @@ -41,4 +41,14 @@ public function title(): string { return 'Refreshing index'; } + + public function advance(): int + { + return 1; + } + + public function completed(): bool + { + return true; + } } diff --git a/src/Jobs/Stages/StageInterface.php b/src/Jobs/Stages/StageInterface.php index e5cff307..126cee4a 100644 --- a/src/Jobs/Stages/StageInterface.php +++ b/src/Jobs/Stages/StageInterface.php @@ -6,9 +6,29 @@ interface StageInterface { + /** + * @return string + */ public function title(): string; + /** + * @return int + */ public function estimate(): int; + /** + * @return int + */ + public function advance(): int; + + /** + * @return bool + */ + public function completed(): bool; + + /** + * @param Client $elasticsearch + * @return void + */ public function handle(Client $elasticsearch): void; } diff --git a/src/Jobs/Stages/SwitchToNewAndRemoveOldIndex.php b/src/Jobs/Stages/SwitchToNewAndRemoveOldIndex.php index eae35aac..da218d04 100644 --- a/src/Jobs/Stages/SwitchToNewAndRemoveOldIndex.php +++ b/src/Jobs/Stages/SwitchToNewAndRemoveOldIndex.php @@ -1,8 +1,11 @@ index = $index; } + /** + * {@inheritdoc} + */ public function handle(Client $elasticsearch): void { $source = $this->source; $params = Get::anyIndex($source->searchableAs()); - $response = $elasticsearch->indices()->getAlias($params->toArray())->asArray(); + /** @var Elasticsearch $elasticResponse */ + $elasticResponse = $elasticsearch->indices()->getAlias($params->toArray()); + $response = $elasticResponse->asArray(); $params = new Update(); foreach ($response as $indexName => $alias) { @@ -49,13 +58,35 @@ public function handle(Client $elasticsearch): void $elasticsearch->indices()->updateAliases($params->toArray()); } + /** + * {@inheritdoc} + */ public function estimate(): int { return 1; } + /** + * {@inheritdoc} + */ public function title(): string { return 'Switching to the new index'; } + + /** + * {@inheritdoc} + */ + public function advance(): int + { + return 1; + } + + /** + * {@inheritdoc} + */ + public function completed(): bool + { + return true; + } } diff --git a/src/Searchable/DefaultImportSource.php b/src/Searchable/DefaultImportSource.php index 0de226b9..439e9a54 100644 --- a/src/Searchable/DefaultImportSource.php +++ b/src/Searchable/DefaultImportSource.php @@ -1,78 +1,146 @@ */ private $scopes; + /** + * @var int + */ + private $totalChunks; + + /** + * @var int + */ + private $chunkSize; + + /** + * @var Scope|null + */ + private $chunkScope; + /** * DefaultImportSource constructor. * - * @param string $className - * @param array $scopes + * @param string $className + * @param array $scopes + * @param Scope|null $chunkScope */ - public function __construct(string $className, array $scopes = []) + public function __construct(string $className, array $scopes = [], Scope|null $chunkScope = null) { $this->className = $className; $this->scopes = $scopes; + $this->chunkScope = $chunkScope; + $this->chunkSize = self::DEFAULT_CHUNK_SIZE; + $this->totalChunks = 0; } + /** + * {@inheritdoc} + */ public function syncWithSearchUsingQueue(): ?string { return $this->model()->syncWithSearchUsingQueue(); } + /** + * {@inheritdoc} + */ public function syncWithSearchUsing(): ?string { return $this->model()->syncWithSearchUsing(); } + /** + * {@inheritdoc} + */ public function searchableAs(): string { return $this->model()->searchableAs(); } - public function chunked(): Collection + /** + * {@inheritdoc} + */ + public function getTotalChunks(): int + { + return $this->totalChunks; + } + + /** + * {@inheritdoc} + */ + public function getChunkSize(): int + { + return $this->chunkSize; + } + + /** + * {@inheritdoc} + */ + public function setChunkScope(Scope $scope): void + { + $this->chunkScope = $scope; + } + + /** + * {@inheritdoc} + */ + public function chunked(): ?ImportSource { $query = $this->newQuery(); $totalSearchables = $query->count(); if ($totalSearchables) { - $chunkSize = (int) config('scout.chunk.searchable', self::DEFAULT_CHUNK_SIZE); - $totalChunks = (int) ceil($totalSearchables / $chunkSize); - - return collect(range(1, $totalChunks))->map(function ($page) use ($chunkSize) { - $chunkScope = new PageScope($page, $chunkSize); - - return new static($this->className, array_merge($this->scopes, [$chunkScope])); - }); - } else { - return collect(); + $configChunkSize = config('scout.chunk.searchable', self::DEFAULT_CHUNK_SIZE); + $this->chunkSize = is_numeric($configChunkSize) ? intval($configChunkSize) : self::DEFAULT_CHUNK_SIZE; + $this->totalChunks = (int) ceil($totalSearchables / $this->chunkSize); + if ($this->model()->getKeyType() !== 'int') { + $this->chunkScope = new PageScope(0, $this->chunkSize); + } else { + $this->chunkScope = new FromScope(0, $this->chunkSize); + } + return $this; } + return null; } /** - * @return mixed + * @return Model|Searchable */ private function model() { + /** @var Model|Searchable */ return new $this->className; } + /** + * @return Builder<\Illuminate\Database\Eloquent\Model> + */ private function newQuery(): Builder { $query = $this->className::makeAllSearchableUsing($this->model()->newQuery()); @@ -82,7 +150,12 @@ private function newQuery(): Builder return $query->withTrashed(); }) ->orderBy($this->model()->getQualifiedKeyName()); + $scopes = $this->scopes; + + if ($this->chunkScope) { + $scopes = array_merge($scopes, [$this->chunkScope]); + } return collect($scopes)->reduce(function ($instance, $scope) { $instance->withGlobalScope(get_class($scope), $scope); @@ -91,9 +164,12 @@ private function newQuery(): Builder }, $query); } + /** + * {@inheritdoc} + */ public function get(): EloquentCollection { - /** @var EloquentCollection $models */ + /** @var EloquentCollection $models */ $models = $this->newQuery()->get(); return $models; diff --git a/src/Searchable/ImportSource.php b/src/Searchable/ImportSource.php index 874fdb53..4ae938a2 100644 --- a/src/Searchable/ImportSource.php +++ b/src/Searchable/ImportSource.php @@ -1,19 +1,52 @@ + */ public function get(): EloquentCollection; } From cde973a74ddd7cdcd217bce729cfa90570be308b Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Tue, 7 May 2024 13:09:36 +0300 Subject: [PATCH 04/12] Updated tests, larastan. --- src/ElasticSearch/Config/Storage.php | 38 ++++++++-- .../EloquentHitsIteratorAggregate.php | 21 ++++-- src/ElasticSearch/HitsIteratorAggregate.php | 9 ++- src/ElasticSearch/Index.php | 2 + src/ElasticSearch/Params/Bulk.php | 5 ++ src/ElasticSearch/SearchFactory.php | 2 +- src/ElasticSearchServiceProvider.php | 2 + src/Engines/ElasticSearchEngine.php | 32 +++++++-- src/Jobs/ProcessSearchable.php | 2 +- src/Searchable/SearchableListFactory.php | 16 +++-- tests/Feature/ImportCommandTest.php | 71 +++++++++++++++++-- .../Engines/ElasticSearchEngineTest.php | 14 ++++ tests/Integration/Jobs/ImportTest.php | 15 ++-- .../Jobs/Stages/PullFromSourceTest.php | 10 +-- .../Unit/ElasticSearch/Config/ConfigTest.php | 4 ++ tests/Unit/Jobs/ImportStagesTest.php | 13 ++-- .../Searchable/SearchableListFactoryTest.php | 1 + tests/laravel/app/BookWithCustomKey.php | 5 ++ .../Library/CustomHitsIteratorAggregate.php | 2 +- .../database/factories/BookFactory.php | 22 ++++++ 20 files changed, 233 insertions(+), 53 deletions(-) diff --git a/src/ElasticSearch/Config/Storage.php b/src/ElasticSearch/Config/Storage.php index 87abf2d4..fed54fb2 100644 --- a/src/ElasticSearch/Config/Storage.php +++ b/src/ElasticSearch/Config/Storage.php @@ -28,7 +28,15 @@ public static function load(string $config): Storage */ public function hosts(): array { - return explode(',', $this->loadConfig('host')); + /** @var mixed $hostConfig */ + $hostConfig = $this->loadConfig('host'); + + if (is_string($hostConfig)) { + /** @var string $hostConfig */ + return explode(',', $hostConfig); + } + + return []; } /** @@ -36,7 +44,10 @@ public function hosts(): array */ public function user(): ?string { - return $this->loadConfig('user'); + /** @var string|null $userConfig */ + $userConfig = $this->loadConfig('user'); + + return $userConfig; } /** @@ -44,7 +55,10 @@ public function user(): ?string */ public function password(): ?string { - return $this->loadConfig('password'); + /** @var string|null $passwordConfig */ + $passwordConfig = $this->loadConfig('password'); + + return $passwordConfig; } /** @@ -52,7 +66,10 @@ public function password(): ?string */ public function elasticCloudId(): ?string { - return $this->loadConfig('cloud_id'); + /** @var string|null $cloudConfig */ + $cloudConfig = $this->loadConfig('cloud_id'); + + return $cloudConfig; } /** @@ -60,7 +77,10 @@ public function elasticCloudId(): ?string */ public function apiKey(): ?string { - return $this->loadConfig('api_key'); + /** @var string|null $apiConfig */ + $apiConfig = $this->loadConfig('api_key'); + + return $apiConfig; } /** @@ -68,7 +88,13 @@ public function apiKey(): ?string */ public function queueTimeout(): ?int { - return (int) $this->loadConfig('queue.timeout') ?: null; + $queueTimeoutConfig = $this->loadConfig('queue.timeout'); + + if (is_numeric($queueTimeoutConfig)) { + return intval($queueTimeoutConfig); + } + + return null; } /** diff --git a/src/ElasticSearch/EloquentHitsIteratorAggregate.php b/src/ElasticSearch/EloquentHitsIteratorAggregate.php index 7918191c..8a5b17b6 100644 --- a/src/ElasticSearch/EloquentHitsIteratorAggregate.php +++ b/src/ElasticSearch/EloquentHitsIteratorAggregate.php @@ -4,8 +4,9 @@ use IteratorAggregate; use Laravel\Scout\Builder; -use Laravel\Scout\Searchable; use Traversable; +use Illuminate\Database\Eloquent\Model; +use Laravel\Scout\Searchable; /** * @internal @@ -41,14 +42,15 @@ public function __construct(array $results, callable $callback = null) * * @since 5.0.0 */ - public function getIterator() + public function getIterator(): Traversable { $hits = collect(); if ($this->results['hits']['total']) { + /** @var array> */ $hits = $this->results['hits']['hits']; $models = collect($hits)->groupBy('_source.__class_name') ->map(function ($results, $class) { - /** @var Searchable $model */ + /** @var Model&Searchable $model */ $model = new $class; $model->setKeyType('string'); $builder = new Builder($model, ''); @@ -60,11 +62,20 @@ public function getIterator() $builder, $results->pluck('_id')->all() ); }) - ->flatten()->keyBy(function ($model) { + ->flatten()->keyBy(function (Model|Searchable $model) { return get_class($model).'::'.$model->getScoutKey(); }); $hits = collect($hits)->map(function ($hit) use ($models) { - $key = $hit['_source']['__class_name'].'::'.$hit['_id']; + /** @var array $hit */ + if (! isset($hit['_source'], $hit['_id'])) { + return null; + } + $source = $hit['_source']; + if (! isset($source['__class_name'])) { + return null; + } + + $key = $source['__class_name'].'::'.$hit['_id']; return isset($models[$key]) ? $models[$key] : null; })->filter()->all(); diff --git a/src/ElasticSearch/HitsIteratorAggregate.php b/src/ElasticSearch/HitsIteratorAggregate.php index 20032ea4..a270c9ae 100644 --- a/src/ElasticSearch/HitsIteratorAggregate.php +++ b/src/ElasticSearch/HitsIteratorAggregate.php @@ -4,7 +4,14 @@ interface HitsIteratorAggregate extends \IteratorAggregate { + /** + * @param array $results + * @param callable|null $callback + */ public function __construct(array $results, callable $callback = null); - public function getIterator(); + /** + * {@inheritDoc} + */ + public function getIterator(): \Traversable; } diff --git a/src/ElasticSearch/Index.php b/src/ElasticSearch/Index.php index c14d0433..f4ca7e0d 100644 --- a/src/ElasticSearch/Index.php +++ b/src/ElasticSearch/Index.php @@ -94,7 +94,9 @@ public static function fromSource(ImportSource $source): Index 'number_of_replicas' => 0, ]; + /** @var array $settings */ $settings = config($settingsConfigKey, config('elasticsearch.indices.settings.default', $defaultSettings)); + /** @var array $mappings */ $mappings = config($mappingsConfigKey, config('elasticsearch.indices.mappings.default')); return new static($name, $settings, $mappings); diff --git a/src/ElasticSearch/Params/Bulk.php b/src/ElasticSearch/Params/Bulk.php index 475ece03..0a48fd8c 100644 --- a/src/ElasticSearch/Params/Bulk.php +++ b/src/ElasticSearch/Params/Bulk.php @@ -2,6 +2,9 @@ namespace Matchish\ScoutElasticSearch\ElasticSearch\Params; +use Illuminate\Database\Eloquent\Model; +use Laravel\Scout\Searchable; + /** * @internal */ @@ -27,6 +30,7 @@ public function delete($docs): void $this->delete($doc); } } else { + /** @var Model|Searchable $docs */ $this->deleteDocs[$docs->getScoutKey()] = $docs; } } @@ -98,6 +102,7 @@ public function index($docs): void $this->index($doc); } } else { + /** @var Model|Searchable $docs */ $this->indexDocs[$docs->getScoutKey()] = $docs; } } diff --git a/src/ElasticSearch/SearchFactory.php b/src/ElasticSearch/SearchFactory.php index 7e560847..4ac5ac7a 100644 --- a/src/ElasticSearch/SearchFactory.php +++ b/src/ElasticSearch/SearchFactory.php @@ -107,6 +107,6 @@ private static function hasWheres($builder): bool */ private static function hasWhereIns($builder): bool { - return isset($builder->whereIns) && ! empty($builder->whereIns); + return ! empty($builder->whereIns); } } diff --git a/src/ElasticSearchServiceProvider.php b/src/ElasticSearchServiceProvider.php index 6fb5dd87..b461b66b 100644 --- a/src/ElasticSearchServiceProvider.php +++ b/src/ElasticSearchServiceProvider.php @@ -23,10 +23,12 @@ public function register(): void $this->app->bind(Client::class, function () { $clientBuilder = ClientBuilder::create()->setHosts(Config::hosts()); if ($user = Config::user()) { + /** @var string $user */ $clientBuilder->setBasicAuthentication($user, Config::password()); } if ($cloudId = Config::elasticCloudId()) { + /** @var string $cloudId */ $clientBuilder->setElasticCloudId($cloudId) ->setApiKey(Config::apiKey()); } diff --git a/src/Engines/ElasticSearchEngine.php b/src/Engines/ElasticSearchEngine.php index 46637e14..fac54250 100644 --- a/src/Engines/ElasticSearchEngine.php +++ b/src/Engines/ElasticSearchEngine.php @@ -3,6 +3,7 @@ namespace Matchish\ScoutElasticSearch\Engines; use Elastic\Elasticsearch\Client; +use Elastic\Elasticsearch\Response\Elasticsearch; use Elastic\Elasticsearch\Exception\ServerResponseException; use Illuminate\Database\Eloquent\Collection; use Illuminate\Support\LazyCollection; @@ -15,6 +16,8 @@ use Matchish\ScoutElasticSearch\ElasticSearch\Params\Search as SearchParams; use Matchish\ScoutElasticSearch\ElasticSearch\SearchFactory; use Matchish\ScoutElasticSearch\ElasticSearch\SearchResults; +use Illuminate\Database\Eloquent\Model; +use Laravel\Scout\Searchable; use ONGR\ElasticsearchDSL\Query\MatchAllQuery; use ONGR\ElasticsearchDSL\Search; @@ -39,21 +42,29 @@ public function __construct(Client $elasticsearch) } /** - * {@inheritdoc} + * @param Collection $models */ public function update($models) { $params = new Bulk(); $params->index($models); - $response = $this->elasticsearch->bulk($params->toArray())->asArray(); + /** @var Elasticsearch $elasticResponse */ + $elasticResponse = $this->elasticsearch->bulk($params->toArray()); + $response = $elasticResponse->asArray(); if (array_key_exists('errors', $response) && $response['errors']) { - $error = new ServerResponseException(json_encode($response, JSON_PRETTY_PRINT)); + /** @var string|bool $json */ + $json = json_encode($response, JSON_PRETTY_PRINT); + if ($json === false) { + throw new \Exception('Bulk update error'); + } + /** @var string $json */ + $error = new ServerResponseException($json); throw new \Exception('Bulk update error', $error->getCode(), $error); } } /** - * {@inheritdoc} + * @param Collection $models */ public function delete($models) { @@ -63,12 +74,14 @@ public function delete($models) } /** - * {@inheritdoc} + * @param Model|Searchable $model */ public function flush($model) { $indexName = $model->searchableAs(); - $exist = $this->elasticsearch->indices()->exists(['index' => $indexName])->asBool(); + /** @var Elasticsearch $response */ + $response = $this->elasticsearch->indices()->exists(['index' => $indexName]); + $exist = $response->asBool(); if ($exist) { $body = (new Search())->addQuery(new MatchAllQuery())->toArray(); $params = new SearchParams($indexName, $body); @@ -98,10 +111,15 @@ public function paginate(BaseBuilder $builder, $perPage, $page) /** * {@inheritdoc} + * @return Collection */ public function mapIds($results) { - return collect($results['hits']['hits'])->pluck('_id'); + $hits = isset($results['hits']) ? $results['hits'] : []; + if (! isset($hits['hits'])) { + return collect(); + } + return collect($hits['hits'])->pluck('_id'); } /** diff --git a/src/Jobs/ProcessSearchable.php b/src/Jobs/ProcessSearchable.php index adc66c3c..b30c8dcb 100644 --- a/src/Jobs/ProcessSearchable.php +++ b/src/Jobs/ProcessSearchable.php @@ -48,6 +48,6 @@ public function handle(): void /** @var \Laravel\Scout\Engines\Engine $engine */ $engine = $model->searchableUsing(); - $engine->update($this->data); // @phpstan-ignore-line + $engine->update($this->data); } } \ No newline at end of file diff --git a/src/Searchable/SearchableListFactory.php b/src/Searchable/SearchableListFactory.php index c03bf3a9..372c33bb 100644 --- a/src/Searchable/SearchableListFactory.php +++ b/src/Searchable/SearchableListFactory.php @@ -9,6 +9,7 @@ use Laravel\Scout\Searchable; use PhpParser\Error; use PhpParser\Node; +use PhpParser\Node\Name; use PhpParser\Node\Stmt\Class_; use PhpParser\NodeFinder; use PhpParser\NodeTraverser; @@ -61,7 +62,7 @@ public function getErrors(): array } /** - * @return Collection + * @return Collection */ public function make(): Collection { @@ -88,7 +89,7 @@ private function find(): array private function getSearchableClasses(): array { if (self::$searchableClasses === null) { - self::$searchableClasses = $this->getProjectClasses()->filter(function ($class) { + self::$searchableClasses = $this->getProjectClasses()->filter(function (string $class) { return $this->findSearchableTraitRecursively($class); })->toArray(); } @@ -97,7 +98,7 @@ private function getSearchableClasses(): array } /** - * @return Collection + * @return Collection */ private function getProjectClasses(): Collection { @@ -106,8 +107,11 @@ private function getProjectClasses(): Collection return $node instanceof Class_; }); - return Collection::make($nodes)->map(function ($node) { - return $node->namespacedName->toCodeString(); + return Collection::make($nodes)->map(function (Class_ $node) { + $namespace = $node->namespacedName; + if ($namespace instanceof Name) { + return $namespace->toCodeString(); + } }); } @@ -130,7 +134,7 @@ private function getStmts(): array } $stmts = Collection::make($stmts)->flatten(1)->toArray(); - + /** @var \PhpParser\Node[] $stmts */ return $nodeTraverser->traverse($stmts); } diff --git a/tests/Feature/ImportCommandTest.php b/tests/Feature/ImportCommandTest.php index 32dd00ff..ee4c9df7 100644 --- a/tests/Feature/ImportCommandTest.php +++ b/tests/Feature/ImportCommandTest.php @@ -72,6 +72,35 @@ public function test_import_entites_in_queue(): void $this->assertEquals($productsAmount, $response['hits']['total']['value']); } + public function test_import_entites_in_parallel(): void + { + $this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']); + + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = random_int(1, 5); + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + + Artisan::call('scout:import', [ + '--parallel' => true, + ]); + + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals($productsAmount, $response['hits']['total']['value']); + } + + public function test_import_all_pages(): void { $dispatcher = Product::getEventDispatcher(); @@ -100,16 +129,48 @@ public function test_import_with_custom_key_all_pages(): void { $this->app['config']['scout.key'] = 'title'; - $dispatcher = Book::getEventDispatcher(); - Book::unsetEventDispatcher(); + $dispatcher = BookWithCustomKey::getEventDispatcher(); + BookWithCustomKey::unsetEventDispatcher(); $booksAmount = 10; - factory(Book::class, $booksAmount)->create(); + factory(BookWithCustomKey::class, $booksAmount)->create(); - Book::setEventDispatcher($dispatcher); + BookWithCustomKey::setEventDispatcher($dispatcher); - Artisan::call('scout:import'); + Artisan::call('scout:import', [ + 'searchable' => BookWithCustomKey::class, + ]); + $params = [ + 'index' => (new BookWithCustomKey())->searchableAs(), + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals($booksAmount, $response['hits']['total']['value']); + } + + public function test_import_with_custom_key_all_pages_in_parallel(): void + { + $this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']); + $this->app['config']['scout.key'] = 'title'; + + $dispatcher = BookWithCustomKey::getEventDispatcher(); + BookWithCustomKey::unsetEventDispatcher(); + + $booksAmount = 10; + + factory(BookWithCustomKey::class, $booksAmount)->create(); + + BookWithCustomKey::setEventDispatcher($dispatcher); + + Artisan::call('scout:import', [ + 'searchable' => BookWithCustomKey::class, + '--parallel' => true, + ]); $params = [ 'index' => (new BookWithCustomKey())->searchableAs(), 'body' => [ diff --git a/tests/Integration/Engines/ElasticSearchEngineTest.php b/tests/Integration/Engines/ElasticSearchEngineTest.php index f208e7f5..fc87cbc2 100644 --- a/tests/Integration/Engines/ElasticSearchEngineTest.php +++ b/tests/Integration/Engines/ElasticSearchEngineTest.php @@ -170,6 +170,20 @@ public function test_lazy_map_for_mixed_search(): void }); } + public function test_create_index(): void + { + $this->expectException(\Error::class); + $this->expectExceptionMessage('Not implemented'); + $this->engine->createIndex('products'); + } + + public function test_delete_index(): void + { + $this->expectException(\Error::class); + $this->expectExceptionMessage('Not implemented'); + $this->engine->deleteIndex('products'); + } + private function refreshIndex(string $index): void { $params = [ diff --git a/tests/Integration/Jobs/ImportTest.php b/tests/Integration/Jobs/ImportTest.php index 5e5789a4..8f73876b 100644 --- a/tests/Integration/Jobs/ImportTest.php +++ b/tests/Integration/Jobs/ImportTest.php @@ -33,13 +33,14 @@ public function test_progress_report() dispatch($job); $this->assertEquals([ - 'Clean up 1/7', - 'Create write index 2/7', - 'Indexing... 3/7', - 'Indexing... 4/7', - 'Indexing... 5/7', - 'Refreshing index 6/7', - 'Switching to the new index 7/7', + 'Clean up 1/8', + 'Create write index 2/8', + 'Indexing... 3/8', + 'Indexing... 4/8', + 'Indexing... 5/8', + 'Indexing... 6/8', + 'Refreshing index 7/8', + 'Switching to the new index 8/8', ], $output->getLogs()); } } diff --git a/tests/Integration/Jobs/Stages/PullFromSourceTest.php b/tests/Integration/Jobs/Stages/PullFromSourceTest.php index d99f7bb6..74378673 100644 --- a/tests/Integration/Jobs/Stages/PullFromSourceTest.php +++ b/tests/Integration/Jobs/Stages/PullFromSourceTest.php @@ -149,8 +149,8 @@ public function test_pull_soft_deleted() 'index' => 'products_index', 'body' => ['aliases' => ['products' => new stdClass()]], ]); - $stages = PullFromSource::chunked(DefaultImportSourceFactory::from(Product::class)); - $stages->first()->handle(); + $stage = PullFromSource::chunked(DefaultImportSourceFactory::from(Product::class)); + $stage->handle(); $this->elasticsearch->indices()->refresh([ 'index' => 'products', ]); @@ -168,9 +168,9 @@ public function test_pull_soft_deleted() public function test_no_searchables_no_chunks() { - $stages = PullFromSource::chunked(DefaultImportSourceFactory::from(Product::class)); + $stage = PullFromSource::chunked(DefaultImportSourceFactory::from(Product::class)); - $this->assertEquals(0, $stages->count()); + $this->assertEquals(null, $stage); } public function test_chunked_pull_only_one_page() @@ -185,7 +185,7 @@ public function test_chunked_pull_only_one_page() Product::setEventDispatcher($dispatcher); $chunks = PullFromSource::chunked(DefaultImportSourceFactory::from(Product::class)); - $chunks->first()->handle(); + $chunks->handle(); $this->elasticsearch->indices()->refresh([ 'index' => 'products', ]); diff --git a/tests/Unit/ElasticSearch/Config/ConfigTest.php b/tests/Unit/ElasticSearch/Config/ConfigTest.php index d45ce9e7..acac7723 100644 --- a/tests/Unit/ElasticSearch/Config/ConfigTest.php +++ b/tests/Unit/ElasticSearch/Config/ConfigTest.php @@ -14,6 +14,10 @@ public function test_parse_host(): void $config = new ScoutConfig(); $this->assertEquals(['http://localhost:9200'], $config::hosts()); + + Config::set('elasticsearch.host', 9001); + $config = new ScoutConfig(); + $this->assertEquals([], $config::hosts()); } public function test_parse_multihost(): void diff --git a/tests/Unit/Jobs/ImportStagesTest.php b/tests/Unit/Jobs/ImportStagesTest.php index cd055c71..94fdc528 100644 --- a/tests/Unit/Jobs/ImportStagesTest.php +++ b/tests/Unit/Jobs/ImportStagesTest.php @@ -20,22 +20,19 @@ public function test_no_pull_stages_if_no_searchables() $this->assertEquals(4, $stages->count()); $this->assertInstanceOf(CleanUp::class, $stages->get(0)); $this->assertInstanceOf(CreateWriteIndex::class, $stages->get(1)); - $this->assertInstanceOf(RefreshIndex::class, $stages->get(2)); - $this->assertInstanceOf(SwitchToNewAndRemoveOldIndex::class, $stages->get(3)); + $this->assertInstanceOf(RefreshIndex::class, $stages->get(3)); + $this->assertInstanceOf(SwitchToNewAndRemoveOldIndex::class, $stages->get(4)); } public function test_stages() { factory(Product::class, 10)->create(); $stages = ImportStages::fromSource(DefaultImportSourceFactory::from(Product::class)); - $this->assertEquals(8, $stages->count()); + $this->assertEquals(5, $stages->count()); $this->assertInstanceOf(CleanUp::class, $stages->get(0)); $this->assertInstanceOf(CreateWriteIndex::class, $stages->get(1)); $this->assertInstanceOf(PullFromSource::class, $stages->get(2)); - $this->assertInstanceOf(PullFromSource::class, $stages->get(3)); - $this->assertInstanceOf(PullFromSource::class, $stages->get(4)); - $this->assertInstanceOf(PullFromSource::class, $stages->get(5)); - $this->assertInstanceOf(RefreshIndex::class, $stages->get(6)); - $this->assertInstanceOf(SwitchToNewAndRemoveOldIndex::class, $stages->get(7)); + $this->assertInstanceOf(RefreshIndex::class, $stages->get(3)); + $this->assertInstanceOf(SwitchToNewAndRemoveOldIndex::class, $stages->get(4)); } } diff --git a/tests/Unit/Searchable/SearchableListFactoryTest.php b/tests/Unit/Searchable/SearchableListFactoryTest.php index 16fa3f97..66653afa 100644 --- a/tests/Unit/Searchable/SearchableListFactoryTest.php +++ b/tests/Unit/Searchable/SearchableListFactoryTest.php @@ -20,6 +20,7 @@ public function test_only_load_seachable_classes() // There are 5 searchable models: Book, BookWithCustomKey, Product, Ticket and Post $this->assertCount(5, $searchable); + $this->assertIsArray($factory->getErrors()); } public function test_find_searchable_trait_within_trait() diff --git a/tests/laravel/app/BookWithCustomKey.php b/tests/laravel/app/BookWithCustomKey.php index 5e5f5c09..eec33d88 100644 --- a/tests/laravel/app/BookWithCustomKey.php +++ b/tests/laravel/app/BookWithCustomKey.php @@ -13,6 +13,11 @@ class BookWithCustomKey extends Book 'year', ]; + public function getKeyType() + { + return 'string'; + } + public function getKeyName() { return 'custom_key'; diff --git a/tests/laravel/app/Library/CustomHitsIteratorAggregate.php b/tests/laravel/app/Library/CustomHitsIteratorAggregate.php index 99bc2ee5..6626a11c 100644 --- a/tests/laravel/app/Library/CustomHitsIteratorAggregate.php +++ b/tests/laravel/app/Library/CustomHitsIteratorAggregate.php @@ -17,7 +17,7 @@ public function __construct(array $results, callable $callback = null) $this->callback = $callback; } - public function getIterator() + public function getIterator(): \Traversable { $hits = ['test1', 'test2', 'test3']; diff --git a/tests/laravel/database/factories/BookFactory.php b/tests/laravel/database/factories/BookFactory.php index 6b3882f7..f0ce4291 100644 --- a/tests/laravel/database/factories/BookFactory.php +++ b/tests/laravel/database/factories/BookFactory.php @@ -3,6 +3,7 @@ declare(strict_types=1); use App\Book; +use App\BookWithCustomKey; use Faker\Generator as Faker; $factory->define(Book::class, function (Faker $faker) { @@ -25,3 +26,24 @@ 'title' => "{$faker->sentence} Barcelona {$faker->sentence}", ]; }); + +$factory->define(BookWithCustomKey::class, function (Faker $faker) { + return [ + 'custom_key' => $faker->uuid, + 'title' => $faker->sentence, + 'author' => $faker->name, + 'year' => $faker->year, + ]; +}); + +$factory->state(BookWithCustomKey::class, 'new-york', function (Faker $faker) { + return [ + 'title' => "{$faker->sentence} New-York {$faker->sentence}", + ]; +}); + +$factory->state(BookWithCustomKey::class, 'barcelona', function (Faker $faker) { + return [ + 'title' => "{$faker->sentence} Barcelona {$faker->sentence}", + ]; +}); From 26a797318e3a6589247cf998964237fdb6e8437a Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Fri, 10 May 2024 19:56:02 +0300 Subject: [PATCH 05/12] Updated env config back to original. Trackable-Job package added to composer as a suggestion. styleci. --- .env.example | 2 -- composer.json | 4 ++- src/Console/Commands/ImportCommand.php | 12 ++++----- src/ElasticSearch/Config/Storage.php | 2 +- .../EloquentHitsIteratorAggregate.php | 4 +-- src/ElasticSearch/HitsIteratorAggregate.php | 4 +-- src/Engines/ElasticSearchEngine.php | 13 +++++----- src/Jobs/Import.php | 6 ++--- src/Jobs/ImportStages.php | 4 +-- src/Jobs/ProcessSearchable.php | 9 +++---- src/Jobs/Stages/CleanUp.php | 2 +- src/Jobs/Stages/PullFromSource.php | 2 +- src/Jobs/Stages/PullFromSourceParallel.php | 26 ++++++++++--------- src/Jobs/Stages/StageInterface.php | 2 +- src/Searchable/DefaultImportSource.php | 18 +++++++------ src/Searchable/ImportSource.php | 2 +- src/Searchable/SearchableListFactory.php | 1 + tests/Feature/ImportCommandTest.php | 1 - .../Engines/ElasticSearchEngineTest.php | 2 +- tests/IntegrationTestCase.php | 4 ++- tests/TestCase.php | 14 +++++----- 21 files changed, 71 insertions(+), 63 deletions(-) diff --git a/.env.example b/.env.example index 4fa16080..43b2d8d0 100644 --- a/.env.example +++ b/.env.example @@ -2,8 +2,6 @@ APP_ENV=development XDEBUG_CONFIG=remote_port=9001 remote_host=172.17.0.1 remote_autostart=1 remote_log=/home/user/xdebug.log PHP_IDE_CONFIG=serverName=Scout ELASTICSEARCH_HOST=elasticsearch:9200 -ELASTICSEARCH_USER=elasticsearch -ELASTICSEARCH_PASSWORD=password DB_HOST=db DB_PORT=3306 DB_DATABASE=my_database diff --git a/composer.json b/composer.json index 96bc49a8..2d062337 100644 --- a/composer.json +++ b/composer.json @@ -20,7 +20,6 @@ "elasticsearch/elasticsearch": "^8.0", "handcraftedinthealps/elasticsearch-dsl": "^8.0", "laravel/scout": "^8.0|^9.0|^10.0", - "mateusjunges/laravel-trackable-jobs": "^1.6", "roave/better-reflection": "^4.3|^5.0|^6.18" }, "require-dev": { @@ -50,6 +49,9 @@ ] } }, + "suggest": { + "mateusjunges/laravel-trackable-jobs": "Required when using --parallel option in ImportCommand." + }, "config": { "sort-packages": true, "preferred-install": "dist" diff --git a/src/Console/Commands/ImportCommand.php b/src/Console/Commands/ImportCommand.php index bc504689..289f3eeb 100644 --- a/src/Console/Commands/ImportCommand.php +++ b/src/Console/Commands/ImportCommand.php @@ -31,7 +31,7 @@ public function handle(): void { $parallel = false; - if($this->option('parallel')) { + if ($this->option('parallel')) { $parallel = true; } @@ -42,7 +42,7 @@ public function handle(): void } /** - * @param array $argument + * @param array $argument * * @return Collection */ @@ -56,8 +56,8 @@ private function searchableList(array $argument): Collection } /** - * @param string $searchable - * @param bool $parallel + * @param string $searchable + * @param bool $parallel * * @return void */ @@ -68,7 +68,7 @@ private function import(string $searchable, bool $parallel): void $job = new Import($source); /** @var int|null $queueTimeout */ $queueTimeout = Config::queueTimeout(); - if($queueTimeout !== null) { + if ($queueTimeout !== null) { $job->timeout = (int) $queueTimeout; } $job->parallel = $parallel; @@ -77,7 +77,7 @@ private function import(string $searchable, bool $parallel): void $job = (new QueueableJob())->chain([$job]); /** @var int|null $queueTimeout */ $queueTimeout = Config::queueTimeout(); - if($queueTimeout !== null) { + if ($queueTimeout !== null) { $job->timeout = (int) $queueTimeout; } } diff --git a/src/ElasticSearch/Config/Storage.php b/src/ElasticSearch/Config/Storage.php index fed54fb2..0bc427c1 100644 --- a/src/ElasticSearch/Config/Storage.php +++ b/src/ElasticSearch/Config/Storage.php @@ -93,7 +93,7 @@ public function queueTimeout(): ?int if (is_numeric($queueTimeoutConfig)) { return intval($queueTimeoutConfig); } - + return null; } diff --git a/src/ElasticSearch/EloquentHitsIteratorAggregate.php b/src/ElasticSearch/EloquentHitsIteratorAggregate.php index 8a5b17b6..938453cc 100644 --- a/src/ElasticSearch/EloquentHitsIteratorAggregate.php +++ b/src/ElasticSearch/EloquentHitsIteratorAggregate.php @@ -2,11 +2,11 @@ namespace Matchish\ScoutElasticSearch\ElasticSearch; +use Illuminate\Database\Eloquent\Model; use IteratorAggregate; use Laravel\Scout\Builder; -use Traversable; -use Illuminate\Database\Eloquent\Model; use Laravel\Scout\Searchable; +use Traversable; /** * @internal diff --git a/src/ElasticSearch/HitsIteratorAggregate.php b/src/ElasticSearch/HitsIteratorAggregate.php index a270c9ae..042e03c6 100644 --- a/src/ElasticSearch/HitsIteratorAggregate.php +++ b/src/ElasticSearch/HitsIteratorAggregate.php @@ -5,8 +5,8 @@ interface HitsIteratorAggregate extends \IteratorAggregate { /** - * @param array $results - * @param callable|null $callback + * @param array $results + * @param callable|null $callback */ public function __construct(array $results, callable $callback = null); diff --git a/src/Engines/ElasticSearchEngine.php b/src/Engines/ElasticSearchEngine.php index fac54250..ea575086 100644 --- a/src/Engines/ElasticSearchEngine.php +++ b/src/Engines/ElasticSearchEngine.php @@ -3,21 +3,21 @@ namespace Matchish\ScoutElasticSearch\Engines; use Elastic\Elasticsearch\Client; -use Elastic\Elasticsearch\Response\Elasticsearch; use Elastic\Elasticsearch\Exception\ServerResponseException; +use Elastic\Elasticsearch\Response\Elasticsearch; use Illuminate\Database\Eloquent\Collection; +use Illuminate\Database\Eloquent\Model; use Illuminate\Support\LazyCollection; use Laravel\Scout\Builder; use Laravel\Scout\Builder as BaseBuilder; use Laravel\Scout\Engines\Engine; +use Laravel\Scout\Searchable; use Matchish\ScoutElasticSearch\ElasticSearch\HitsIteratorAggregate; use Matchish\ScoutElasticSearch\ElasticSearch\Params\Bulk; use Matchish\ScoutElasticSearch\ElasticSearch\Params\Indices\Refresh; use Matchish\ScoutElasticSearch\ElasticSearch\Params\Search as SearchParams; use Matchish\ScoutElasticSearch\ElasticSearch\SearchFactory; use Matchish\ScoutElasticSearch\ElasticSearch\SearchResults; -use Illuminate\Database\Eloquent\Model; -use Laravel\Scout\Searchable; use ONGR\ElasticsearchDSL\Query\MatchAllQuery; use ONGR\ElasticsearchDSL\Search; @@ -42,7 +42,7 @@ public function __construct(Client $elasticsearch) } /** - * @param Collection $models + * @param Collection $models */ public function update($models) { @@ -64,7 +64,7 @@ public function update($models) } /** - * @param Collection $models + * @param Collection $models */ public function delete($models) { @@ -74,7 +74,7 @@ public function delete($models) } /** - * @param Model|Searchable $model + * @param Model|Searchable $model */ public function flush($model) { @@ -111,6 +111,7 @@ public function paginate(BaseBuilder $builder, $perPage, $page) /** * {@inheritdoc} + * * @return Collection */ public function mapIds($results) diff --git a/src/Jobs/Import.php b/src/Jobs/Import.php index c176d11b..65985aee 100644 --- a/src/Jobs/Import.php +++ b/src/Jobs/Import.php @@ -22,7 +22,7 @@ final class Import */ private $source; /** - * @var boolean + * @var bool */ public $parallel = false; @@ -52,8 +52,8 @@ public function handle(Client $elasticsearch): void $this->progressBar()->setMessage($currentStage->title()); $currentStage->handle($elasticsearch); $this->progressBar()->advance($currentStage->advance()); - if($currentStage->completed()) { - if($stages->isEmpty()) { + if ($currentStage->completed()) { + if ($stages->isEmpty()) { /** @var null $currentStage */ $currentStage = null; } else { diff --git a/src/Jobs/ImportStages.php b/src/Jobs/ImportStages.php index 12a7b35a..f0bee7f1 100644 --- a/src/Jobs/ImportStages.php +++ b/src/Jobs/ImportStages.php @@ -3,13 +3,13 @@ namespace Matchish\ScoutElasticSearch\Jobs; use Illuminate\Support\Collection; -use Matchish\ScoutElasticSearch\Jobs\Stages\StageInterface; use Matchish\ScoutElasticSearch\ElasticSearch\Index; use Matchish\ScoutElasticSearch\Jobs\Stages\CleanUp; use Matchish\ScoutElasticSearch\Jobs\Stages\CreateWriteIndex; use Matchish\ScoutElasticSearch\Jobs\Stages\PullFromSource; use Matchish\ScoutElasticSearch\Jobs\Stages\PullFromSourceParallel; use Matchish\ScoutElasticSearch\Jobs\Stages\RefreshIndex; +use Matchish\ScoutElasticSearch\Jobs\Stages\StageInterface; use Matchish\ScoutElasticSearch\Jobs\Stages\SwitchToNewAndRemoveOldIndex; use Matchish\ScoutElasticSearch\Searchable\ImportSource; @@ -28,7 +28,7 @@ public static function fromSource(ImportSource $source, bool $parallel = false) { $index = Index::fromSource($source); - if($parallel) { + if ($parallel && class_exists(\Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class)) { return (new self([ new CleanUp($source), new CreateWriteIndex($source, $index), diff --git a/src/Jobs/ProcessSearchable.php b/src/Jobs/ProcessSearchable.php index b30c8dcb..fb753d0e 100644 --- a/src/Jobs/ProcessSearchable.php +++ b/src/Jobs/ProcessSearchable.php @@ -4,15 +4,14 @@ namespace Matchish\ScoutElasticSearch\Jobs; - -use Illuminate\Support\Collection; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; +use Illuminate\Database\Eloquent\Model; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; +use Illuminate\Support\Collection; use Junges\TrackableJobs\Concerns\Trackable; -use Illuminate\Database\Eloquent\Model; use Laravel\Scout\Searchable; class ProcessSearchable implements ShouldQueue @@ -27,7 +26,7 @@ class ProcessSearchable implements ShouldQueue private $data; /** - * @param Collection $data + * @param Collection $data */ public function __construct(Collection $data) { @@ -37,7 +36,7 @@ public function __construct(Collection $data) /** * Handles the job execution. - * + * * @return void */ public function handle(): void diff --git a/src/Jobs/Stages/CleanUp.php b/src/Jobs/Stages/CleanUp.php index 7258461f..238ee2dc 100644 --- a/src/Jobs/Stages/CleanUp.php +++ b/src/Jobs/Stages/CleanUp.php @@ -3,8 +3,8 @@ namespace Matchish\ScoutElasticSearch\Jobs\Stages; use Elastic\Elasticsearch\Client; -use Elastic\Elasticsearch\Response\Elasticsearch; use Elastic\Elasticsearch\Exception\ClientResponseException; +use Elastic\Elasticsearch\Response\Elasticsearch; use Matchish\ScoutElasticSearch\ElasticSearch\Params\Indices\Alias\Get as GetAliasParams; use Matchish\ScoutElasticSearch\ElasticSearch\Params\Indices\Delete as DeleteIndexParams; use Matchish\ScoutElasticSearch\Searchable\ImportSource; diff --git a/src/Jobs/Stages/PullFromSource.php b/src/Jobs/Stages/PullFromSource.php index b09ff351..7d901dc9 100644 --- a/src/Jobs/Stages/PullFromSource.php +++ b/src/Jobs/Stages/PullFromSource.php @@ -71,7 +71,7 @@ public function completed(): bool public static function chunked(ImportSource $source): ?PullFromSource { $source = $source->chunked(); - if($source === null) { + if ($source === null) { return null; } return new static($source); diff --git a/src/Jobs/Stages/PullFromSourceParallel.php b/src/Jobs/Stages/PullFromSourceParallel.php index 427d27b1..6cc3d6a1 100644 --- a/src/Jobs/Stages/PullFromSourceParallel.php +++ b/src/Jobs/Stages/PullFromSourceParallel.php @@ -4,8 +4,8 @@ use Elastic\Elasticsearch\Client; use Junges\TrackableJobs\Models\TrackedJob; -use Matchish\ScoutElasticSearch\Database\Scopes\PageScope; use Matchish\ScoutElasticSearch\Database\Scopes\FromScope; +use Matchish\ScoutElasticSearch\Database\Scopes\PageScope; use Matchish\ScoutElasticSearch\Jobs\ProcessSearchable; use Matchish\ScoutElasticSearch\Searchable\ImportSource; @@ -33,7 +33,7 @@ final class PullFromSourceParallel implements StageInterface * @var array */ private $dispatchedJobIds = []; - + /** * @var int */ @@ -52,7 +52,7 @@ final class PullFromSourceParallel implements StageInterface public function __construct(ImportSource $source) { $this->source = $source; - $this->queues = collect(config('scout.chunk.handlers', self::DEFAULT_HANDLER_COUNT))->map(function($i) { + $this->queues = collect(config('scout.chunk.handlers', self::DEFAULT_HANDLER_COUNT))->map(function ($i) { return 'import_'.$i; })->toArray(); } @@ -65,6 +65,7 @@ private function getNextQueue(): string /** @var string $queue */ $queue = array_shift($this->queues); $this->queues[] = $queue; + return $queue; } @@ -73,22 +74,22 @@ private function getNextQueue(): string */ public function handle(Client $elasticsearch = null): void { - if(count($this->dispatchedJobIds) > 0) { + if (count($this->dispatchedJobIds) > 0) { $jobs = TrackedJob::findMany($this->dispatchedJobIds); - $finishedJobs = $jobs->filter(function($job) { + $finishedJobs = $jobs->filter(function ($job) { return $job->status === TrackedJob::STATUS_FINISHED; }); $this->handledJobs = array_merge($this->handledJobs, $finishedJobs->pluck('id')->toArray()); $this->advanceBy += $finishedJobs->count(); - $this->dispatchedJobIds = $jobs->filter(function($job) { + $this->dispatchedJobIds = $jobs->filter(function ($job) { return $job->status !== TrackedJob::STATUS_FINISHED; })->pluck('id')->toArray(); } - - if(count($this->dispatchedJobIds) >= $this->source->getTotalChunks()) { + + if (count($this->dispatchedJobIds) >= $this->source->getTotalChunks()) { return; } - + $results = $this->source->get()->filter->shouldBeSearchable(); if (! $results->isEmpty()) { @@ -98,7 +99,7 @@ public function handle(Client $elasticsearch = null): void if ($results->first()->getKeyType() !== 'int') { $this->source->setChunkScope( new PageScope( - count($this->handledJobs) + count($this->dispatchedJobIds), + count($this->handledJobs) + count($this->dispatchedJobIds), $this->source->getChunkSize()) ); } else { @@ -122,6 +123,7 @@ public function advance(): int { $advance = $this->advanceBy; $this->advanceBy = 0; + return $advance; } @@ -149,10 +151,10 @@ public static function chunked(ImportSource $source): PullFromSourceParallel|nul { $source = $source->chunked(); - if($source === null) { + if ($source === null) { return null; } - + return new static($source); } } diff --git a/src/Jobs/Stages/StageInterface.php b/src/Jobs/Stages/StageInterface.php index 126cee4a..04157214 100644 --- a/src/Jobs/Stages/StageInterface.php +++ b/src/Jobs/Stages/StageInterface.php @@ -27,7 +27,7 @@ public function advance(): int; public function completed(): bool; /** - * @param Client $elasticsearch + * @param Client $elasticsearch * @return void */ public function handle(Client $elasticsearch): void; diff --git a/src/Searchable/DefaultImportSource.php b/src/Searchable/DefaultImportSource.php index 439e9a54..1622d181 100644 --- a/src/Searchable/DefaultImportSource.php +++ b/src/Searchable/DefaultImportSource.php @@ -6,11 +6,11 @@ use Illuminate\Database\Eloquent\Builder; use Illuminate\Database\Eloquent\Collection as EloquentCollection; -use Matchish\ScoutElasticSearch\Database\Scopes\FromScope; -use Matchish\ScoutElasticSearch\Database\Scopes\PageScope; -use Illuminate\Database\Eloquent\Scope; use Illuminate\Database\Eloquent\Model; +use Illuminate\Database\Eloquent\Scope; use Laravel\Scout\Searchable; +use Matchish\ScoutElasticSearch\Database\Scopes\FromScope; +use Matchish\ScoutElasticSearch\Database\Scopes\PageScope; final class DefaultImportSource implements ImportSource { @@ -47,9 +47,9 @@ final class DefaultImportSource implements ImportSource /** * DefaultImportSource constructor. * - * @param string $className - * @param array $scopes - * @param Scope|null $chunkScope + * @param string $className + * @param array $scopes + * @param Scope|null $chunkScope */ public function __construct(string $className, array $scopes = [], Scope|null $chunkScope = null) { @@ -124,8 +124,10 @@ public function chunked(): ?ImportSource } else { $this->chunkScope = new FromScope(0, $this->chunkSize); } + return $this; } + return null; } @@ -150,9 +152,9 @@ private function newQuery(): Builder return $query->withTrashed(); }) ->orderBy($this->model()->getQualifiedKeyName()); - + $scopes = $this->scopes; - + if ($this->chunkScope) { $scopes = array_merge($scopes, [$this->chunkScope]); } diff --git a/src/Searchable/ImportSource.php b/src/Searchable/ImportSource.php index 4ae938a2..405cc01b 100644 --- a/src/Searchable/ImportSource.php +++ b/src/Searchable/ImportSource.php @@ -30,7 +30,7 @@ public function searchableAs(): string; public function chunked(): ?ImportSource; /** - * @param Scope $scope + * @param Scope $scope * @return void */ public function setChunkScope(Scope $scope): void; diff --git a/src/Searchable/SearchableListFactory.php b/src/Searchable/SearchableListFactory.php index 372c33bb..f28a31ff 100644 --- a/src/Searchable/SearchableListFactory.php +++ b/src/Searchable/SearchableListFactory.php @@ -134,6 +134,7 @@ private function getStmts(): array } $stmts = Collection::make($stmts)->flatten(1)->toArray(); + /** @var \PhpParser\Node[] $stmts */ return $nodeTraverser->traverse($stmts); } diff --git a/tests/Feature/ImportCommandTest.php b/tests/Feature/ImportCommandTest.php index ee4c9df7..bf0d67ff 100644 --- a/tests/Feature/ImportCommandTest.php +++ b/tests/Feature/ImportCommandTest.php @@ -87,7 +87,6 @@ public function test_import_entites_in_parallel(): void Artisan::call('scout:import', [ '--parallel' => true, ]); - $params = [ 'index' => 'products', 'body' => [ diff --git a/tests/Integration/Engines/ElasticSearchEngineTest.php b/tests/Integration/Engines/ElasticSearchEngineTest.php index fc87cbc2..c21e6514 100644 --- a/tests/Integration/Engines/ElasticSearchEngineTest.php +++ b/tests/Integration/Engines/ElasticSearchEngineTest.php @@ -176,7 +176,7 @@ public function test_create_index(): void $this->expectExceptionMessage('Not implemented'); $this->engine->createIndex('products'); } - + public function test_delete_index(): void { $this->expectException(\Error::class); diff --git a/tests/IntegrationTestCase.php b/tests/IntegrationTestCase.php index 1cd5d7bc..f4bc4249 100644 --- a/tests/IntegrationTestCase.php +++ b/tests/IntegrationTestCase.php @@ -32,7 +32,9 @@ protected function getEnvironmentSetUp($app) parent::getEnvironmentSetUp($app); $app['config']->set('elasticsearch', require(__DIR__.'/../config/elasticsearch.php')); - $app['config']->set('trackable-job', require(__DIR__.'/../vendor/mateusjunges/laravel-trackable-jobs/config/trackable-jobs.php')); + if (class_exists(\Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class)) { + $app['config']->set('trackable-job', require(__DIR__.'/../vendor/mateusjunges/laravel-trackable-jobs/config/trackable-jobs.php')); + } $app['config']->set('elasticsearch.indices.mappings.products', [ 'properties' => [ 'type' => [ diff --git a/tests/TestCase.php b/tests/TestCase.php index 6e695fe8..0dfbc6be 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -8,7 +8,6 @@ use Matchish\ScoutElasticSearch\ElasticSearchServiceProvider; use Matchish\ScoutElasticSearch\Engines\ElasticSearchEngine; use Matchish\ScoutElasticSearch\ScoutElasticSearchServiceProvider; -use Junges\TrackableJobs\Providers\TrackableJobsServiceProvider; use Orchestra\Testbench\TestCase as BaseTestCase; abstract class TestCase extends BaseTestCase @@ -38,18 +37,21 @@ protected function getEnvironmentSetUp($app) $app['config']->set('scout.queue', false); // Setup default database to use sqlite :memory: $app['config']->set('database.default', 'mysql'); - $app['config']->set('elasticsearch.host', 'elasticsearch:9200'); - $app['config']->set('elasticsearch.user', 'elasticsearch'); - $app['config']->set('elasticsearch.password', 'password'); } protected function getPackageProviders($app) { - return [ + /** @var array $providers */ + $providers = [ ScoutServiceProvider::class, ScoutElasticSearchServiceProvider::class, ElasticSearchServiceProvider::class, - TrackableJobsServiceProvider::class, ]; + + if (class_exists(\Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class)) { + $providers[] = \Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class; + } + + return $providers; } } From 256bd0c6b3ee4dea012e0719c8858e3f45ca38b0 Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Mon, 13 May 2024 13:45:12 +0300 Subject: [PATCH 06/12] Updated and added some tests. Added tracked-jobs package to github/workflows. --- .github/workflows/test-application.yaml | 1 + src/Console/Commands/ImportCommand.php | 2 - src/Engines/ElasticSearchEngine.php | 2 +- src/Jobs/ImportStages.php | 1 - src/Jobs/Stages/PullFromSource.php | 1 + src/Searchable/DefaultImportSource.php | 2 +- tests/Feature/ImportCommandTest.php | 71 +++++- tests/Integration/Jobs/ImportTest.php | 33 +++ .../Stages/PullFromSourceParallelTest.php | 204 ++++++++++++++++++ .../Searchable/DefaultImportSourceTest.php | 17 ++ tests/TestCase.php | 6 + ...5_02_000002_create_tracked_jobs_table.php} | 0 12 files changed, 334 insertions(+), 6 deletions(-) create mode 100644 tests/Integration/Jobs/Stages/PullFromSourceParallelTest.php rename tests/laravel/database/migrations/{2019_15_02_000002_laravel_trackable_create_tracked_jobs_table.php => 2019_15_02_000002_create_tracked_jobs_table.php} (100%) diff --git a/.github/workflows/test-application.yaml b/.github/workflows/test-application.yaml index ba709e9e..e18601fa 100644 --- a/.github/workflows/test-application.yaml +++ b/.github/workflows/test-application.yaml @@ -65,6 +65,7 @@ jobs: run: | composer validate --strict composer install --no-interaction --prefer-dist + composer require mateusjunges/laravel-trackable-jobs --no-interaction - name: Run tests run: vendor/bin/phpunit --coverage-clover=coverage.xml diff --git a/src/Console/Commands/ImportCommand.php b/src/Console/Commands/ImportCommand.php index 289f3eeb..9e09b849 100644 --- a/src/Console/Commands/ImportCommand.php +++ b/src/Console/Commands/ImportCommand.php @@ -43,7 +43,6 @@ public function handle(): void /** * @param array $argument - * * @return Collection */ private function searchableList(array $argument): Collection @@ -58,7 +57,6 @@ private function searchableList(array $argument): Collection /** * @param string $searchable * @param bool $parallel - * * @return void */ private function import(string $searchable, bool $parallel): void diff --git a/src/Engines/ElasticSearchEngine.php b/src/Engines/ElasticSearchEngine.php index ea575086..e43a9acb 100644 --- a/src/Engines/ElasticSearchEngine.php +++ b/src/Engines/ElasticSearchEngine.php @@ -111,7 +111,7 @@ public function paginate(BaseBuilder $builder, $perPage, $page) /** * {@inheritdoc} - * + * * @return Collection */ public function mapIds($results) diff --git a/src/Jobs/ImportStages.php b/src/Jobs/ImportStages.php index f0bee7f1..b15a776e 100644 --- a/src/Jobs/ImportStages.php +++ b/src/Jobs/ImportStages.php @@ -21,7 +21,6 @@ class ImportStages extends Collection /** * @param ImportSource $source * @param bool $parallel - * * @return Collection */ public static function fromSource(ImportSource $source, bool $parallel = false) diff --git a/src/Jobs/Stages/PullFromSource.php b/src/Jobs/Stages/PullFromSource.php index 7d901dc9..6e8e3d13 100644 --- a/src/Jobs/Stages/PullFromSource.php +++ b/src/Jobs/Stages/PullFromSource.php @@ -74,6 +74,7 @@ public static function chunked(ImportSource $source): ?PullFromSource if ($source === null) { return null; } + return new static($source); } } diff --git a/src/Searchable/DefaultImportSource.php b/src/Searchable/DefaultImportSource.php index 1622d181..c5943b70 100644 --- a/src/Searchable/DefaultImportSource.php +++ b/src/Searchable/DefaultImportSource.php @@ -57,7 +57,7 @@ public function __construct(string $className, array $scopes = [], Scope|null $c $this->scopes = $scopes; $this->chunkScope = $chunkScope; $this->chunkSize = self::DEFAULT_CHUNK_SIZE; - $this->totalChunks = 0; + $this->totalChunks = 1; } /** diff --git a/tests/Feature/ImportCommandTest.php b/tests/Feature/ImportCommandTest.php index bf0d67ff..1dd52891 100644 --- a/tests/Feature/ImportCommandTest.php +++ b/tests/Feature/ImportCommandTest.php @@ -99,7 +99,6 @@ public function test_import_entites_in_parallel(): void $this->assertEquals($productsAmount, $response['hits']['total']['value']); } - public function test_import_all_pages(): void { $dispatcher = Product::getEventDispatcher(); @@ -124,6 +123,32 @@ public function test_import_all_pages(): void $this->assertEquals($productsAmount, $response['hits']['total']['value']); } + public function test_import_all_pages_in_parallel(): void + { + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = 10; + + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + + Artisan::call('scout:import', [ + '--parallel' => true + ]); + $params = [ + 'index' => (new Product())->searchableAs(), + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals($productsAmount, $response['hits']['total']['value']); + } + public function test_import_with_custom_key_all_pages(): void { $this->app['config']['scout.key'] = 'title'; @@ -209,6 +234,35 @@ public function test_remove_old_index_after_switching_to_new(): void $this->assertFalse($this->elasticsearch->indices()->exists(['index' => 'products_old'])->asBool(), 'Old index must be deleted'); } + public function test_remove_old_index_after_switching_to_new_in_parallel_import(): void + { + $params = [ + 'index' => 'products_old', + 'body' => [ + 'aliases' => ['products' => new stdClass()], + 'settings' => [ + 'number_of_shards' => 1, + 'number_of_replicas' => 0, + ], + ], + ]; + $this->elasticsearch->indices()->create($params); + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = random_int(1, 5); + + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + + Artisan::call('scout:import', [ + '--parallel' => true + ]); + + $this->assertFalse($this->elasticsearch->indices()->exists(['index' => 'products_old'])->asBool(), 'Old index must be deleted'); + } + public function test_progress_report(): void { $output = new BufferedOutput(); @@ -246,6 +300,21 @@ public function test_progress_report_in_queue(): void $this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output); } + public function test_progress_report_in_parallel(): void + { + $this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']); + + $output = new BufferedOutput(); + Artisan::call('scout:import', [ + '--parallel' => true + ], $output); + + $output = array_map('trim', explode("\n", $output->fetch())); + + $this->assertContains(trans('scout::import.start', ['searchable' => Product::class]), $output); + $this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output); + } + public function test_queue_timeout_configuration(): void { Bus::fake([ diff --git a/tests/Integration/Jobs/ImportTest.php b/tests/Integration/Jobs/ImportTest.php index 8f73876b..b909423c 100644 --- a/tests/Integration/Jobs/ImportTest.php +++ b/tests/Integration/Jobs/ImportTest.php @@ -43,4 +43,37 @@ public function test_progress_report() 'Switching to the new index 8/8', ], $output->getLogs()); } + + public function test_progress_report_parallel() + { + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + $productsAmount = 7; + factory(Product::class, $productsAmount)->create(); + Product::setEventDispatcher($dispatcher); + + $job = new Import(DefaultImportSourceFactory::from(Product::class)); + $job->parallel = true; + $output = new DummyOutput(); + $outputStyle = new OutputStyle(new ArrayInput([]), $output); + $progressBar = $outputStyle->createProgressBar(); + $progressBar->setRedrawFrequency(1); + $progressBar->maxSecondsBetweenRedraws(0); + $progressBar->minSecondsBetweenRedraws(0); + $progressBar->setFormat('[%message%] %current%/%max%'); + $job->withProgressReport($progressBar); + + dispatch($job); + + $this->assertEquals([ + 'Clean up 1/7', + 'Create write index 2/7', + 'Indexing... 2/7', + 'Indexing... 3/7', + 'Indexing... 4/7', + 'Indexing... 5/7', + 'Refreshing index 6/7', + 'Switching to the new index 7/7', + ], $output->getLogs()); + } } diff --git a/tests/Integration/Jobs/Stages/PullFromSourceParallelTest.php b/tests/Integration/Jobs/Stages/PullFromSourceParallelTest.php new file mode 100644 index 00000000..3c135235 --- /dev/null +++ b/tests/Integration/Jobs/Stages/PullFromSourceParallelTest.php @@ -0,0 +1,204 @@ +create(); + + Product::setEventDispatcher($dispatcher); + $this->elasticsearch->indices()->create([ + 'index' => 'products_index', + 'body' => ['aliases' => ['products' => new stdClass()]], + ]); + $stage = new PullFromSourceParallel(DefaultImportSourceFactory::from(Product::class)); + $stage->handle(); + $this->elasticsearch->indices()->refresh([ + 'index' => 'products', + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals($productsAmount, $response['hits']['total']['value']); + } + + public function test_dont_put_entities_if_no_entities_in_collection(): void + { + $this->elasticsearch->indices()->create([ + 'index' => 'products_index', + 'body' => ['aliases' => ['products' => new stdClass()]], + ]); + $stage = new PullFromSourceParallel(DefaultImportSourceFactory::from(Product::class)); + $stage->handle(); + $this->elasticsearch->indices()->refresh([ + 'index' => 'products', + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals(0, $response['hits']['total']['value']); + } + + public function test_put_all_to_index_if_amount_of_entities_more_than_chunk_size(): void + { + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = 20; + $this->app['config']->set('scout.chunk.searchable', 5); + + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + $this->elasticsearch->indices()->create([ + 'index' => 'products_index', + 'body' => ['aliases' => ['products' => new stdClass()]], + ]); + $stage = new PullFromSourceParallel(DefaultImportSourceFactory::from(Product::class)); + $stage->handle(); + $this->elasticsearch->indices()->refresh([ + 'index' => 'products', + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals($productsAmount, $response['hits']['total']['value']); + } + + public function test_pull_soft_delete_meta_data() + { + $this->app['config']['scout.soft_delete'] = true; + + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = rand(1, 5); + + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + $this->elasticsearch->indices()->create([ + 'index' => 'products_index', + 'body' => ['aliases' => ['products' => new stdClass()]], + ]); + $stage = new PullFromSourceParallel(DefaultImportSourceFactory::from(Product::class)); + $stage->handle(); + $this->elasticsearch->indices()->refresh([ + 'index' => 'products', + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals(0, $response['hits']['hits'][0]['_source']['__soft_deleted']); + } + + public function test_pull_soft_deleted() + { + $this->app['config']['scout.soft_delete'] = true; + + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = 3; + + factory(Product::class, $productsAmount)->create(); + + Product::limit(1)->get()->first()->delete(); + + Product::setEventDispatcher($dispatcher); + $this->elasticsearch->indices()->create([ + 'index' => 'products_index', + 'body' => ['aliases' => ['products' => new stdClass()]], + ]); + $stage = PullFromSourceParallel::chunked(DefaultImportSourceFactory::from(Product::class)); + $stage->handle(); + $this->elasticsearch->indices()->refresh([ + 'index' => 'products', + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals(3, $response['hits']['total']['value']); + } + + public function test_no_searchables_no_chunks() + { + $stage = PullFromSourceParallel::chunked(DefaultImportSourceFactory::from(Product::class)); + + $this->assertEquals(null, $stage); + } + + public function test_chunked_pull_only_one_page() + { + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = 5; + + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + + $chunks = PullFromSourceParallel::chunked(DefaultImportSourceFactory::from(Product::class)); + $chunks->handle(); + $this->elasticsearch->indices()->refresh([ + 'index' => 'products', + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + + $this->assertEquals(3, $response['hits']['total']['value']); + } +} diff --git a/tests/Integration/Searchable/DefaultImportSourceTest.php b/tests/Integration/Searchable/DefaultImportSourceTest.php index caa8eaf4..94d283da 100644 --- a/tests/Integration/Searchable/DefaultImportSourceTest.php +++ b/tests/Integration/Searchable/DefaultImportSourceTest.php @@ -27,6 +27,23 @@ public function test_new_query_has_injected_scopes() $products = $source->get(); $this->assertEquals($iphonePromoUsedAmount, $products->count()); } + + public function test_new_query_has_injected_chunk_scope() + { + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $iphonePromoUsedAmount = rand(1, 5); + $iphonePromoNewAmount = rand(6, 10); + + factory(Product::class, $iphonePromoUsedAmount)->states(['iphone', 'promo', 'used'])->create(); + factory(Product::class, $iphonePromoNewAmount)->states(['iphone', 'promo', 'new'])->create(); + + Product::setEventDispatcher($dispatcher); + $source = new DefaultImportSource(Product::class, [], new UsedScope()); + $products = $source->get(); + $this->assertEquals($iphonePromoUsedAmount, $products->count()); + } } class UsedScope implements Scope diff --git a/tests/TestCase.php b/tests/TestCase.php index 0dfbc6be..b3899a9b 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -20,6 +20,12 @@ public function setUp(): void $this->withFactories(database_path('factories')); + if (class_exists(\Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class)) { + \Artisan::call('vendor:publish', [ + '--tag' => 'trackable-jobs-config' + ]); + } + \Artisan::call('migrate:fresh', ['--database' => 'mysql']); } diff --git a/tests/laravel/database/migrations/2019_15_02_000002_laravel_trackable_create_tracked_jobs_table.php b/tests/laravel/database/migrations/2019_15_02_000002_create_tracked_jobs_table.php similarity index 100% rename from tests/laravel/database/migrations/2019_15_02_000002_laravel_trackable_create_tracked_jobs_table.php rename to tests/laravel/database/migrations/2019_15_02_000002_create_tracked_jobs_table.php From 3730b07e682101878f2409bd6448c4cd828b99d5 Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Mon, 13 May 2024 13:54:10 +0300 Subject: [PATCH 07/12] Clear tracked_jobs table after handling PullFromSourceParallel. --- src/Jobs/Stages/PullFromSourceParallel.php | 11 ++++++++++- tests/Integration/Jobs/Stages/PullFromSourceTest.php | 1 + 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/Jobs/Stages/PullFromSourceParallel.php b/src/Jobs/Stages/PullFromSourceParallel.php index 6cc3d6a1..2df8791f 100644 --- a/src/Jobs/Stages/PullFromSourceParallel.php +++ b/src/Jobs/Stages/PullFromSourceParallel.php @@ -140,7 +140,16 @@ public function title(): string */ public function completed(): bool { - return count($this->handledJobs) >= $this->source->getTotalChunks(); + $completed = count($this->handledJobs) >= $this->source->getTotalChunks(); + + // Clear tracked_jobs table, after all jobs are finished. + if ($completed) { + /** @var string $column */ + $column = config('trackable-jobs.using_uuid', false) ? 'uuid' : 'id'; + TrackedJob::query()->where($column, $this->handledJobs)->delete(); + } + + return $completed; } /** diff --git a/tests/Integration/Jobs/Stages/PullFromSourceTest.php b/tests/Integration/Jobs/Stages/PullFromSourceTest.php index 74378673..b9f1af79 100644 --- a/tests/Integration/Jobs/Stages/PullFromSourceTest.php +++ b/tests/Integration/Jobs/Stages/PullFromSourceTest.php @@ -41,6 +41,7 @@ public function test_put_all_entites_to_index(): void ]; $response = $this->elasticsearch->search($params); $this->assertEquals($productsAmount, $response['hits']['total']['value']); + $this->assertDatabaseEmpty('tracked_jobs'); } public function test_dont_put_entities_if_no_entities_in_collection(): void From 339dd626c2087b6dec01695fc021c4609377b898 Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Wed, 15 May 2024 10:19:23 +0300 Subject: [PATCH 08/12] readme --- README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5e5b4466..63374962 100644 --- a/README.md +++ b/README.md @@ -185,7 +185,11 @@ While working in production, to keep your existing search experience available w `php artisan scout:import` -The command create new temporary index, import all models to it, and then switch to the index and remove old index. +The command creates new temporary index, imports all models to it, and then switches to the index and remove old index. + +### Parallel import +When importing massive ammounts of data, you can use the option `--parallel`, to speed up the import process. +This however requires you to set-up the suggested trackable-jobs package and queue workers. ### Search From a1ae08089a8f563fa12e778f6d1ec3fcb57a7d6d Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Wed, 15 May 2024 12:19:59 +0300 Subject: [PATCH 09/12] styleci --- src/Engines/ElasticSearchEngine.php | 1 + src/Jobs/ProcessSearchable.php | 2 +- tests/Feature/ImportCommandTest.php | 6 +++--- tests/TestCase.php | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/Engines/ElasticSearchEngine.php b/src/Engines/ElasticSearchEngine.php index e43a9acb..6e5a96ea 100644 --- a/src/Engines/ElasticSearchEngine.php +++ b/src/Engines/ElasticSearchEngine.php @@ -120,6 +120,7 @@ public function mapIds($results) if (! isset($hits['hits'])) { return collect(); } + return collect($hits['hits'])->pluck('_id'); } diff --git a/src/Jobs/ProcessSearchable.php b/src/Jobs/ProcessSearchable.php index fb753d0e..f0709354 100644 --- a/src/Jobs/ProcessSearchable.php +++ b/src/Jobs/ProcessSearchable.php @@ -49,4 +49,4 @@ public function handle(): void $engine->update($this->data); } -} \ No newline at end of file +} diff --git a/tests/Feature/ImportCommandTest.php b/tests/Feature/ImportCommandTest.php index 1dd52891..f90285f7 100644 --- a/tests/Feature/ImportCommandTest.php +++ b/tests/Feature/ImportCommandTest.php @@ -135,7 +135,7 @@ public function test_import_all_pages_in_parallel(): void Product::setEventDispatcher($dispatcher); Artisan::call('scout:import', [ - '--parallel' => true + '--parallel' => true, ]); $params = [ 'index' => (new Product())->searchableAs(), @@ -257,7 +257,7 @@ public function test_remove_old_index_after_switching_to_new_in_parallel_import( Product::setEventDispatcher($dispatcher); Artisan::call('scout:import', [ - '--parallel' => true + '--parallel' => true, ]); $this->assertFalse($this->elasticsearch->indices()->exists(['index' => 'products_old'])->asBool(), 'Old index must be deleted'); @@ -306,7 +306,7 @@ public function test_progress_report_in_parallel(): void $output = new BufferedOutput(); Artisan::call('scout:import', [ - '--parallel' => true + '--parallel' => true, ], $output); $output = array_map('trim', explode("\n", $output->fetch())); diff --git a/tests/TestCase.php b/tests/TestCase.php index b3899a9b..6d4b5e5f 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -22,7 +22,7 @@ public function setUp(): void if (class_exists(\Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class)) { \Artisan::call('vendor:publish', [ - '--tag' => 'trackable-jobs-config' + '--tag' => 'trackable-jobs-config', ]); } From 2294d6d34c7a8842abf71e5bcdb2dca14784c999 Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Tue, 21 May 2024 13:05:37 +0300 Subject: [PATCH 10/12] Added jobs, to stop in-progress index. Cleans up tracked_job db table. --- src/Jobs/ImportStages.php | 4 ++ src/Jobs/ProcessSearchable.php | 12 +++++ src/Jobs/Stages/CleanUpTrackedJobs.php | 51 ++++++++++++++++++++ src/Jobs/Stages/PullFromSourceParallel.php | 20 ++++---- src/Jobs/Stages/StopTrackedJobs.php | 55 ++++++++++++++++++++++ tests/Integration/Jobs/ImportTest.php | 18 +++---- 6 files changed, 142 insertions(+), 18 deletions(-) create mode 100644 src/Jobs/Stages/CleanUpTrackedJobs.php create mode 100644 src/Jobs/Stages/StopTrackedJobs.php diff --git a/src/Jobs/ImportStages.php b/src/Jobs/ImportStages.php index b15a776e..ed52cff3 100644 --- a/src/Jobs/ImportStages.php +++ b/src/Jobs/ImportStages.php @@ -11,6 +11,8 @@ use Matchish\ScoutElasticSearch\Jobs\Stages\RefreshIndex; use Matchish\ScoutElasticSearch\Jobs\Stages\StageInterface; use Matchish\ScoutElasticSearch\Jobs\Stages\SwitchToNewAndRemoveOldIndex; +use Matchish\ScoutElasticSearch\Jobs\Stages\StopTrackedJobs; +use Matchish\ScoutElasticSearch\Jobs\Stages\CleanUpTrackedJobs; use Matchish\ScoutElasticSearch\Searchable\ImportSource; /** @@ -29,9 +31,11 @@ public static function fromSource(ImportSource $source, bool $parallel = false) if ($parallel && class_exists(\Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class)) { return (new self([ + new StopTrackedJobs($source), new CleanUp($source), new CreateWriteIndex($source, $index), PullFromSourceParallel::chunked($source), + new CleanUpTrackedJobs($source), new RefreshIndex($index), new SwitchToNewAndRemoveOldIndex($source, $index), ]))->flatten()->filter(); diff --git a/src/Jobs/ProcessSearchable.php b/src/Jobs/ProcessSearchable.php index f0709354..3179dbec 100644 --- a/src/Jobs/ProcessSearchable.php +++ b/src/Jobs/ProcessSearchable.php @@ -12,6 +12,7 @@ use Illuminate\Queue\SerializesModels; use Illuminate\Support\Collection; use Junges\TrackableJobs\Concerns\Trackable; +use Junges\TrackableJobs\Models\TrackedJob; use Laravel\Scout\Searchable; class ProcessSearchable implements ShouldQueue @@ -31,6 +32,11 @@ class ProcessSearchable implements ShouldQueue public function __construct(Collection $data) { $this->__baseConstruct($data->first()); + + $this->trackedJob->update([ + 'trackable_type' => $data->first()->searchableAs(), + ]); + $this->data = $data; } @@ -41,6 +47,12 @@ public function __construct(Collection $data) */ public function handle(): void { + $this->trackedJob = $this->trackedJob->fresh(); + if ($this->trackedJob == null || $this->trackedJob->finished_at !== null) { + + return; + } + /** @var Model|Searchable $model */ $model = $this->data->first(); diff --git a/src/Jobs/Stages/CleanUpTrackedJobs.php b/src/Jobs/Stages/CleanUpTrackedJobs.php new file mode 100644 index 00000000..ae23c2c2 --- /dev/null +++ b/src/Jobs/Stages/CleanUpTrackedJobs.php @@ -0,0 +1,51 @@ +source = $source; + } + + public function handle(Client $elasticsearch): void + { + TrackedJob::where('trackable_type', $this->source->searchableAs())->delete(); + } + + public function title(): string + { + return 'Cleaning up tracked jobs records for this index.'; + } + + public function estimate(): int + { + return 1; + } + + public function advance(): int + { + return 1; + } + + public function completed(): bool + { + return true; + } +} diff --git a/src/Jobs/Stages/PullFromSourceParallel.php b/src/Jobs/Stages/PullFromSourceParallel.php index 2df8791f..cec5b4aa 100644 --- a/src/Jobs/Stages/PullFromSourceParallel.php +++ b/src/Jobs/Stages/PullFromSourceParallel.php @@ -76,6 +76,15 @@ public function handle(Client $elasticsearch = null): void { if (count($this->dispatchedJobIds) > 0) { $jobs = TrackedJob::findMany($this->dispatchedJobIds); + $failedJobs = $jobs->filter(function ($job) { + return $job->status === TrackedJob::STATUS_FAILED; + }); + if ($failedJobs->isNotEmpty()) { + $jobs->each(function (TrackedJob $job) { + $job->markAsFailed(); + }); + throw new \Exception('Failed to process jobs: ' . implode(', ', $failedJobs->pluck('id')->toArray())); + } $finishedJobs = $jobs->filter(function ($job) { return $job->status === TrackedJob::STATUS_FINISHED; }); @@ -140,16 +149,7 @@ public function title(): string */ public function completed(): bool { - $completed = count($this->handledJobs) >= $this->source->getTotalChunks(); - - // Clear tracked_jobs table, after all jobs are finished. - if ($completed) { - /** @var string $column */ - $column = config('trackable-jobs.using_uuid', false) ? 'uuid' : 'id'; - TrackedJob::query()->where($column, $this->handledJobs)->delete(); - } - - return $completed; + return count($this->handledJobs) >= $this->source->getTotalChunks(); } /** diff --git a/src/Jobs/Stages/StopTrackedJobs.php b/src/Jobs/Stages/StopTrackedJobs.php new file mode 100644 index 00000000..1c22cf0f --- /dev/null +++ b/src/Jobs/Stages/StopTrackedJobs.php @@ -0,0 +1,55 @@ +source = $source; + } + + public function handle(Client $elasticsearch): void + { + TrackedJob::query() + ->where('trackable_type', $this->source->searchableAs()) + ->each(function (TrackedJob $job) { + $job->markAsFailed('New import started on the same index.'); + }); + } + + public function title(): string + { + return 'Stopping queued jobs for this index.'; + } + + public function estimate(): int + { + return 1; + } + + public function advance(): int + { + return 1; + } + + public function completed(): bool + { + return true; + } +} diff --git a/tests/Integration/Jobs/ImportTest.php b/tests/Integration/Jobs/ImportTest.php index b909423c..161f2fd0 100644 --- a/tests/Integration/Jobs/ImportTest.php +++ b/tests/Integration/Jobs/ImportTest.php @@ -66,14 +66,16 @@ public function test_progress_report_parallel() dispatch($job); $this->assertEquals([ - 'Clean up 1/7', - 'Create write index 2/7', - 'Indexing... 2/7', - 'Indexing... 3/7', - 'Indexing... 4/7', - 'Indexing... 5/7', - 'Refreshing index 6/7', - 'Switching to the new index 7/7', + 'Stopping queued jobs for this index. 1/9', + 'Clean up 2/9', + 'Create write index 3/9', + 'Indexing... 3/9', + 'Indexing... 4/9', + 'Indexing... 5/9', + 'Indexing... 6/9', + 'Cleaning up tracked jobs records for this index. 7/9', + 'Refreshing index 8/9', + 'Switching to the new index 9/9', ], $output->getLogs()); } } From c6015fc92e024e041361d11d3a415a234dda3d33 Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Tue, 21 May 2024 13:07:06 +0300 Subject: [PATCH 11/12] styleci --- src/Jobs/ImportStages.php | 4 ++-- src/Jobs/ProcessSearchable.php | 2 -- src/Jobs/Stages/PullFromSourceParallel.php | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/Jobs/ImportStages.php b/src/Jobs/ImportStages.php index ed52cff3..9cbc3e63 100644 --- a/src/Jobs/ImportStages.php +++ b/src/Jobs/ImportStages.php @@ -5,14 +5,14 @@ use Illuminate\Support\Collection; use Matchish\ScoutElasticSearch\ElasticSearch\Index; use Matchish\ScoutElasticSearch\Jobs\Stages\CleanUp; +use Matchish\ScoutElasticSearch\Jobs\Stages\CleanUpTrackedJobs; use Matchish\ScoutElasticSearch\Jobs\Stages\CreateWriteIndex; use Matchish\ScoutElasticSearch\Jobs\Stages\PullFromSource; use Matchish\ScoutElasticSearch\Jobs\Stages\PullFromSourceParallel; use Matchish\ScoutElasticSearch\Jobs\Stages\RefreshIndex; use Matchish\ScoutElasticSearch\Jobs\Stages\StageInterface; -use Matchish\ScoutElasticSearch\Jobs\Stages\SwitchToNewAndRemoveOldIndex; use Matchish\ScoutElasticSearch\Jobs\Stages\StopTrackedJobs; -use Matchish\ScoutElasticSearch\Jobs\Stages\CleanUpTrackedJobs; +use Matchish\ScoutElasticSearch\Jobs\Stages\SwitchToNewAndRemoveOldIndex; use Matchish\ScoutElasticSearch\Searchable\ImportSource; /** diff --git a/src/Jobs/ProcessSearchable.php b/src/Jobs/ProcessSearchable.php index 3179dbec..98495f58 100644 --- a/src/Jobs/ProcessSearchable.php +++ b/src/Jobs/ProcessSearchable.php @@ -12,7 +12,6 @@ use Illuminate\Queue\SerializesModels; use Illuminate\Support\Collection; use Junges\TrackableJobs\Concerns\Trackable; -use Junges\TrackableJobs\Models\TrackedJob; use Laravel\Scout\Searchable; class ProcessSearchable implements ShouldQueue @@ -49,7 +48,6 @@ public function handle(): void { $this->trackedJob = $this->trackedJob->fresh(); if ($this->trackedJob == null || $this->trackedJob->finished_at !== null) { - return; } diff --git a/src/Jobs/Stages/PullFromSourceParallel.php b/src/Jobs/Stages/PullFromSourceParallel.php index cec5b4aa..c26cf14e 100644 --- a/src/Jobs/Stages/PullFromSourceParallel.php +++ b/src/Jobs/Stages/PullFromSourceParallel.php @@ -83,7 +83,7 @@ public function handle(Client $elasticsearch = null): void $jobs->each(function (TrackedJob $job) { $job->markAsFailed(); }); - throw new \Exception('Failed to process jobs: ' . implode(', ', $failedJobs->pluck('id')->toArray())); + throw new \Exception('Failed to process jobs: '.implode(', ', $failedJobs->pluck('id')->toArray())); } $finishedJobs = $jobs->filter(function ($job) { return $job->status === TrackedJob::STATUS_FINISHED; From 98421534fcb83031e00f7e41b3ab2a10c3844f1e Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Wed, 29 May 2024 17:28:03 +0300 Subject: [PATCH 12/12] changelog update --- CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c78b23b3..6237710b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/) ## [Unreleased] +## [8.0.0-alpha.1] - 2024-05-29 +### Added +- fromScope, uses forPageAfterId. +- a new option "--parallel" for ImportCommand, can only be used with [Trackable-Jobs](https://github.com/mateusjunges/trackable-jobs-for-laravel) package currently. + +## Changed +- StageInterface now has two more functions. Both are used to make PullFromSource stage an iterable one. +- ImportSource interface now has three more functions. Chunk scope can now be set from a stage. +- When possible (model without a custom key), by default fromScope is used instead of pageScope. + ## [7.6.1] - 2024-05-14 ### Fixed - fix for [parser incompatibility](https://github.com/matchish/laravel-scout-elasticsearch/issues/273)