diff --git a/.github/workflows/test-application.yaml b/.github/workflows/test-application.yaml index 5c976b49..f974a020 100644 --- a/.github/workflows/test-application.yaml +++ b/.github/workflows/test-application.yaml @@ -65,6 +65,7 @@ jobs: run: | composer validate --strict composer install --no-interaction --prefer-dist + composer require mateusjunges/laravel-trackable-jobs --no-interaction - name: Run tests run: vendor/bin/phpunit --coverage-clover=coverage.xml diff --git a/.gitignore b/.gitignore index aa4fb319..c5ae7a65 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ composer.lock .env .phpunit.result.cache -/vendor/ \ No newline at end of file +/vendor/ +/build/ \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index c78b23b3..6237710b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/) ## [Unreleased] +## [8.0.0-alpha.1] - 2024-05-29 +### Added +- fromScope, uses forPageAfterId. +- a new option "--parallel" for ImportCommand, can only be used with [Trackable-Jobs](https://github.com/mateusjunges/trackable-jobs-for-laravel) package currently. + +## Changed +- StageInterface now has two more functions. Both are used to make PullFromSource stage an iterable one. +- ImportSource interface now has three more functions. Chunk scope can now be set from a stage. +- When possible (model without a custom key), by default fromScope is used instead of pageScope. + ## [7.6.1] - 2024-05-14 ### Fixed - fix for [parser incompatibility](https://github.com/matchish/laravel-scout-elasticsearch/issues/273) diff --git a/Makefile b/Makefile index 3bca6dd4..131c6fe8 100644 --- a/Makefile +++ b/Makefile @@ -27,24 +27,28 @@ up: ## Start all containers (in background) for development $(docker_compose_bin) up -d down: ## Stop all started for development containers - $(docker_compose_bin) down + $(docker_compose_bin) down -v restart: up ## Restart all started for development containers $(docker_compose_bin) restart +rebuild: down + $(docker_compose_bin) up --build -d + shell: up ## Start shell into application container $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" /bin/sh install: up ## Install application dependencies into application container $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" composer install --no-interaction --ansi -test: up ## Execute application tests +larastan: up $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" ./vendor/bin/phpstan analyze --memory-limit=4000M + +test: up ## Execute application tests $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" ./vendor/bin/phpunit --testdox --stop-on-failure test-coverage: up ## Execute application tests and generate report - $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" ./vendor/bin/phpstan analyze $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" ./vendor/bin/phpunit --coverage-html build/coverage-report test-filter: - $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" ./vendor/bin/phpunit --filter=$(filter) --testdox + $(docker_compose_bin) exec "$(APP_CONTAINER_NAME)" ./vendor/bin/phpunit --filter=$(filter) --testdox \ No newline at end of file diff --git a/README.md b/README.md index 5e5b4466..63374962 100644 --- a/README.md +++ b/README.md @@ -185,7 +185,11 @@ While working in production, to keep your existing search experience available w `php artisan scout:import` -The command create new temporary index, import all models to it, and then switch to the index and remove old index. +The command creates new temporary index, imports all models to it, and then switches to the index and remove old index. + +### Parallel import +When importing massive ammounts of data, you can use the option `--parallel`, to speed up the import process. +This however requires you to set-up the suggested trackable-jobs package and queue workers. ### Search diff --git a/composer.json b/composer.json index 20848228..b8a7829d 100644 --- a/composer.json +++ b/composer.json @@ -23,8 +23,8 @@ "roave/better-reflection": "^4.3|^5.0|^6.18|^6.36" }, "require-dev": { + "larastan/larastan": "^2.9", "laravel/legacy-factories": "^1.0", - "nunomaduro/larastan": "^2.4", "orchestra/testbench": "^6.17|^7.0|^8.0", "php-http/guzzle7-adapter": "^1.0", "phpunit/phpunit": "^9.4.0" @@ -49,6 +49,9 @@ ] } }, + "suggest": { + "mateusjunges/laravel-trackable-jobs": "Required when using --parallel option in ImportCommand." + }, "config": { "sort-packages": true, "preferred-install": "dist" diff --git a/docker-compose.yml b/docker-compose.yml index a843ddf3..f39271cc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3.4' - services: app: build: @@ -50,6 +48,7 @@ services: - bootstrap.memory_lock=true - action.destructive_requires_name=false - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + - xpack.security.enabled=false ulimits: memlock: soft: -1 diff --git a/docker/app/Dockerfile b/docker/app/Dockerfile index 0c5577c3..2a3ae386 100644 --- a/docker/app/Dockerfile +++ b/docker/app/Dockerfile @@ -41,9 +41,10 @@ RUN echo "xdebug.remote_connect_back=0" >> /usr/local/etc/php/conf.d/docker-php- RUN echo "xdebug.idekey=\"PHPSTORM\"" >> /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini RUN echo "xdebug.remote_port=9000" >> /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini RUN echo "xdebug.remote_autostart=1" >> /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini +RUN echo "xdebug.mode=debug,coverage" >> /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini RUN apt-get update && apt-get install -y procps -RUN apt-get update && apt-get install -y netcat +RUN apt-get update && apt-get install -y netcat-traditional RUN apt-get update && apt-get install -y net-tools FROM php as php-testing diff --git a/phpstan.neon.dist b/phpstan.neon.dist index 8143f7a6..5345d5fb 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -1,5 +1,5 @@ includes: - - vendor/nunomaduro/larastan/extension.neon + - vendor/larastan/larastan/extension.neon parameters: level: max paths: diff --git a/src/Console/Commands/ImportCommand.php b/src/Console/Commands/ImportCommand.php index fbdf0675..9e09b849 100644 --- a/src/Console/Commands/ImportCommand.php +++ b/src/Console/Commands/ImportCommand.php @@ -9,7 +9,6 @@ use Matchish\ScoutElasticSearch\ElasticSearch\Config\Config; use Matchish\ScoutElasticSearch\Jobs\Import; use Matchish\ScoutElasticSearch\Jobs\QueueableJob; -use Matchish\ScoutElasticSearch\Searchable\ImportSource; use Matchish\ScoutElasticSearch\Searchable\ImportSourceFactory; use Matchish\ScoutElasticSearch\Searchable\SearchableListFactory; @@ -18,7 +17,8 @@ final class ImportCommand extends Command /** * {@inheritdoc} */ - protected $signature = 'scout:import {searchable?* : The name of the searchable}'; + protected $signature = 'scout:import {searchable?* : The name of the searchable} {--P|parallel : Index items in parallel}'; + /** * {@inheritdoc} */ @@ -29,12 +29,22 @@ final class ImportCommand extends Command */ public function handle(): void { + $parallel = false; + + if ($this->option('parallel')) { + $parallel = true; + } + $this->searchableList((array) $this->argument('searchable')) - ->each(function ($searchable) { - $this->import($searchable); + ->each(function (string $searchable) use ($parallel) { + $this->import($searchable, $parallel); }); } + /** + * @param array $argument + * @return Collection + */ private function searchableList(array $argument): Collection { return collect($argument)->whenEmpty(function () { @@ -44,18 +54,31 @@ private function searchableList(array $argument): Collection }); } - private function import(string $searchable): void + /** + * @param string $searchable + * @param bool $parallel + * @return void + */ + private function import(string $searchable, bool $parallel): void { $sourceFactory = app(ImportSourceFactory::class); $source = $sourceFactory::from($searchable); $job = new Import($source); - $job->timeout = Config::queueTimeout(); + /** @var int|null $queueTimeout */ + $queueTimeout = Config::queueTimeout(); + if ($queueTimeout !== null) { + $job->timeout = (int) $queueTimeout; + } + $job->parallel = $parallel; if (config('scout.queue')) { $job = (new QueueableJob())->chain([$job]); - $job->timeout = Config::queueTimeout(); + /** @var int|null $queueTimeout */ + $queueTimeout = Config::queueTimeout(); + if ($queueTimeout !== null) { + $job->timeout = (int) $queueTimeout; + } } - $bar = (new ProgressBarFactory($this->output))->create(); $job->withProgressReport($bar); diff --git a/src/Database/Scopes/ChunkScope.php b/src/Database/Scopes/ChunkScope.php deleted file mode 100644 index 049c4917..00000000 --- a/src/Database/Scopes/ChunkScope.php +++ /dev/null @@ -1,56 +0,0 @@ -start = $start; - $this->end = $end; - } - - /** - * Apply the scope to a given Eloquent query builder. - * - * @param \Illuminate\Database\Eloquent\Builder $builder - * @param \Illuminate\Database\Eloquent\Model $model - * @return void - */ - public function apply(Builder $builder, Model $model) - { - $start = $this->start; - $end = $this->end; - $builder - ->when(! is_null($start), function ($query) use ($start, $model) { - return $query->where($model->getKeyName(), '>', $start); - }) - ->when(! is_null($end), function ($query) use ($end, $model) { - return $query->where($model->getKeyName(), '<=', $end); - }); - } - - public function key(): string - { - return static::class; - } -} diff --git a/src/Database/Scopes/FromScope.php b/src/Database/Scopes/FromScope.php new file mode 100644 index 00000000..70bff430 --- /dev/null +++ b/src/Database/Scopes/FromScope.php @@ -0,0 +1,44 @@ +lastId = $lastId; + $this->perPage = $perPage; + } + + /** + * Apply the scope to a given Eloquent query builder. + * + * @param Builder $builder + * @param Model $model + * @return void + */ + public function apply(Builder $builder, Model $model) + { + $builder->forPageAfterId($this->perPage, $this->lastId); + } +} diff --git a/src/Database/Scopes/PageScope.php b/src/Database/Scopes/PageScope.php index 6522f0d7..50b8414f 100644 --- a/src/Database/Scopes/PageScope.php +++ b/src/Database/Scopes/PageScope.php @@ -5,6 +5,7 @@ use Illuminate\Database\Eloquent\Builder; use Illuminate\Database\Eloquent\Model; use Illuminate\Database\Eloquent\Scope; +use Laravel\Scout\Searchable; class PageScope implements Scope { @@ -32,8 +33,8 @@ public function __construct(int $page, int $perPage) /** * Apply the scope to a given Eloquent query builder. * - * @param \Illuminate\Database\Eloquent\Builder $builder - * @param \Illuminate\Database\Eloquent\Model $model + * @param Builder $builder + * @param Model $model * @return void */ public function apply(Builder $builder, Model $model) diff --git a/src/ElasticSearch/Config/Storage.php b/src/ElasticSearch/Config/Storage.php index 87abf2d4..0bc427c1 100644 --- a/src/ElasticSearch/Config/Storage.php +++ b/src/ElasticSearch/Config/Storage.php @@ -28,7 +28,15 @@ public static function load(string $config): Storage */ public function hosts(): array { - return explode(',', $this->loadConfig('host')); + /** @var mixed $hostConfig */ + $hostConfig = $this->loadConfig('host'); + + if (is_string($hostConfig)) { + /** @var string $hostConfig */ + return explode(',', $hostConfig); + } + + return []; } /** @@ -36,7 +44,10 @@ public function hosts(): array */ public function user(): ?string { - return $this->loadConfig('user'); + /** @var string|null $userConfig */ + $userConfig = $this->loadConfig('user'); + + return $userConfig; } /** @@ -44,7 +55,10 @@ public function user(): ?string */ public function password(): ?string { - return $this->loadConfig('password'); + /** @var string|null $passwordConfig */ + $passwordConfig = $this->loadConfig('password'); + + return $passwordConfig; } /** @@ -52,7 +66,10 @@ public function password(): ?string */ public function elasticCloudId(): ?string { - return $this->loadConfig('cloud_id'); + /** @var string|null $cloudConfig */ + $cloudConfig = $this->loadConfig('cloud_id'); + + return $cloudConfig; } /** @@ -60,7 +77,10 @@ public function elasticCloudId(): ?string */ public function apiKey(): ?string { - return $this->loadConfig('api_key'); + /** @var string|null $apiConfig */ + $apiConfig = $this->loadConfig('api_key'); + + return $apiConfig; } /** @@ -68,7 +88,13 @@ public function apiKey(): ?string */ public function queueTimeout(): ?int { - return (int) $this->loadConfig('queue.timeout') ?: null; + $queueTimeoutConfig = $this->loadConfig('queue.timeout'); + + if (is_numeric($queueTimeoutConfig)) { + return intval($queueTimeoutConfig); + } + + return null; } /** diff --git a/src/ElasticSearch/EloquentHitsIteratorAggregate.php b/src/ElasticSearch/EloquentHitsIteratorAggregate.php index 7918191c..938453cc 100644 --- a/src/ElasticSearch/EloquentHitsIteratorAggregate.php +++ b/src/ElasticSearch/EloquentHitsIteratorAggregate.php @@ -2,6 +2,7 @@ namespace Matchish\ScoutElasticSearch\ElasticSearch; +use Illuminate\Database\Eloquent\Model; use IteratorAggregate; use Laravel\Scout\Builder; use Laravel\Scout\Searchable; @@ -41,14 +42,15 @@ public function __construct(array $results, callable $callback = null) * * @since 5.0.0 */ - public function getIterator() + public function getIterator(): Traversable { $hits = collect(); if ($this->results['hits']['total']) { + /** @var array> */ $hits = $this->results['hits']['hits']; $models = collect($hits)->groupBy('_source.__class_name') ->map(function ($results, $class) { - /** @var Searchable $model */ + /** @var Model&Searchable $model */ $model = new $class; $model->setKeyType('string'); $builder = new Builder($model, ''); @@ -60,11 +62,20 @@ public function getIterator() $builder, $results->pluck('_id')->all() ); }) - ->flatten()->keyBy(function ($model) { + ->flatten()->keyBy(function (Model|Searchable $model) { return get_class($model).'::'.$model->getScoutKey(); }); $hits = collect($hits)->map(function ($hit) use ($models) { - $key = $hit['_source']['__class_name'].'::'.$hit['_id']; + /** @var array $hit */ + if (! isset($hit['_source'], $hit['_id'])) { + return null; + } + $source = $hit['_source']; + if (! isset($source['__class_name'])) { + return null; + } + + $key = $source['__class_name'].'::'.$hit['_id']; return isset($models[$key]) ? $models[$key] : null; })->filter()->all(); diff --git a/src/ElasticSearch/HitsIteratorAggregate.php b/src/ElasticSearch/HitsIteratorAggregate.php index 20032ea4..042e03c6 100644 --- a/src/ElasticSearch/HitsIteratorAggregate.php +++ b/src/ElasticSearch/HitsIteratorAggregate.php @@ -4,7 +4,14 @@ interface HitsIteratorAggregate extends \IteratorAggregate { + /** + * @param array $results + * @param callable|null $callback + */ public function __construct(array $results, callable $callback = null); - public function getIterator(); + /** + * {@inheritDoc} + */ + public function getIterator(): \Traversable; } diff --git a/src/ElasticSearch/Index.php b/src/ElasticSearch/Index.php index c14d0433..f4ca7e0d 100644 --- a/src/ElasticSearch/Index.php +++ b/src/ElasticSearch/Index.php @@ -94,7 +94,9 @@ public static function fromSource(ImportSource $source): Index 'number_of_replicas' => 0, ]; + /** @var array $settings */ $settings = config($settingsConfigKey, config('elasticsearch.indices.settings.default', $defaultSettings)); + /** @var array $mappings */ $mappings = config($mappingsConfigKey, config('elasticsearch.indices.mappings.default')); return new static($name, $settings, $mappings); diff --git a/src/ElasticSearch/Params/Bulk.php b/src/ElasticSearch/Params/Bulk.php index 475ece03..0a48fd8c 100644 --- a/src/ElasticSearch/Params/Bulk.php +++ b/src/ElasticSearch/Params/Bulk.php @@ -2,6 +2,9 @@ namespace Matchish\ScoutElasticSearch\ElasticSearch\Params; +use Illuminate\Database\Eloquent\Model; +use Laravel\Scout\Searchable; + /** * @internal */ @@ -27,6 +30,7 @@ public function delete($docs): void $this->delete($doc); } } else { + /** @var Model|Searchable $docs */ $this->deleteDocs[$docs->getScoutKey()] = $docs; } } @@ -98,6 +102,7 @@ public function index($docs): void $this->index($doc); } } else { + /** @var Model|Searchable $docs */ $this->indexDocs[$docs->getScoutKey()] = $docs; } } diff --git a/src/ElasticSearch/SearchFactory.php b/src/ElasticSearch/SearchFactory.php index 7e560847..4ac5ac7a 100644 --- a/src/ElasticSearch/SearchFactory.php +++ b/src/ElasticSearch/SearchFactory.php @@ -107,6 +107,6 @@ private static function hasWheres($builder): bool */ private static function hasWhereIns($builder): bool { - return isset($builder->whereIns) && ! empty($builder->whereIns); + return ! empty($builder->whereIns); } } diff --git a/src/ElasticSearchServiceProvider.php b/src/ElasticSearchServiceProvider.php index 6fb5dd87..b461b66b 100644 --- a/src/ElasticSearchServiceProvider.php +++ b/src/ElasticSearchServiceProvider.php @@ -23,10 +23,12 @@ public function register(): void $this->app->bind(Client::class, function () { $clientBuilder = ClientBuilder::create()->setHosts(Config::hosts()); if ($user = Config::user()) { + /** @var string $user */ $clientBuilder->setBasicAuthentication($user, Config::password()); } if ($cloudId = Config::elasticCloudId()) { + /** @var string $cloudId */ $clientBuilder->setElasticCloudId($cloudId) ->setApiKey(Config::apiKey()); } diff --git a/src/Engines/ElasticSearchEngine.php b/src/Engines/ElasticSearchEngine.php index 46637e14..6e5a96ea 100644 --- a/src/Engines/ElasticSearchEngine.php +++ b/src/Engines/ElasticSearchEngine.php @@ -4,11 +4,14 @@ use Elastic\Elasticsearch\Client; use Elastic\Elasticsearch\Exception\ServerResponseException; +use Elastic\Elasticsearch\Response\Elasticsearch; use Illuminate\Database\Eloquent\Collection; +use Illuminate\Database\Eloquent\Model; use Illuminate\Support\LazyCollection; use Laravel\Scout\Builder; use Laravel\Scout\Builder as BaseBuilder; use Laravel\Scout\Engines\Engine; +use Laravel\Scout\Searchable; use Matchish\ScoutElasticSearch\ElasticSearch\HitsIteratorAggregate; use Matchish\ScoutElasticSearch\ElasticSearch\Params\Bulk; use Matchish\ScoutElasticSearch\ElasticSearch\Params\Indices\Refresh; @@ -39,21 +42,29 @@ public function __construct(Client $elasticsearch) } /** - * {@inheritdoc} + * @param Collection $models */ public function update($models) { $params = new Bulk(); $params->index($models); - $response = $this->elasticsearch->bulk($params->toArray())->asArray(); + /** @var Elasticsearch $elasticResponse */ + $elasticResponse = $this->elasticsearch->bulk($params->toArray()); + $response = $elasticResponse->asArray(); if (array_key_exists('errors', $response) && $response['errors']) { - $error = new ServerResponseException(json_encode($response, JSON_PRETTY_PRINT)); + /** @var string|bool $json */ + $json = json_encode($response, JSON_PRETTY_PRINT); + if ($json === false) { + throw new \Exception('Bulk update error'); + } + /** @var string $json */ + $error = new ServerResponseException($json); throw new \Exception('Bulk update error', $error->getCode(), $error); } } /** - * {@inheritdoc} + * @param Collection $models */ public function delete($models) { @@ -63,12 +74,14 @@ public function delete($models) } /** - * {@inheritdoc} + * @param Model|Searchable $model */ public function flush($model) { $indexName = $model->searchableAs(); - $exist = $this->elasticsearch->indices()->exists(['index' => $indexName])->asBool(); + /** @var Elasticsearch $response */ + $response = $this->elasticsearch->indices()->exists(['index' => $indexName]); + $exist = $response->asBool(); if ($exist) { $body = (new Search())->addQuery(new MatchAllQuery())->toArray(); $params = new SearchParams($indexName, $body); @@ -98,10 +111,17 @@ public function paginate(BaseBuilder $builder, $perPage, $page) /** * {@inheritdoc} + * + * @return Collection */ public function mapIds($results) { - return collect($results['hits']['hits'])->pluck('_id'); + $hits = isset($results['hits']) ? $results['hits'] : []; + if (! isset($hits['hits'])) { + return collect(); + } + + return collect($hits['hits'])->pluck('_id'); } /** diff --git a/src/Jobs/Import.php b/src/Jobs/Import.php index 026e5737..65985aee 100644 --- a/src/Jobs/Import.php +++ b/src/Jobs/Import.php @@ -21,6 +21,10 @@ final class Import * @var ImportSource */ private $source; + /** + * @var bool + */ + public $parallel = false; public ?int $timeout = null; @@ -40,16 +44,31 @@ public function handle(Client $elasticsearch): void $stages = $this->stages(); $estimate = $stages->sum->estimate(); $this->progressBar()->setMaxSteps($estimate); - $stages->each(function ($stage) use ($elasticsearch) { - /** @var StageInterface $stage */ - $this->progressBar()->setMessage($stage->title()); - $stage->handle($elasticsearch); - $this->progressBar()->advance($stage->estimate()); - }); + + /** @var StageInterface $currentStage */ + $currentStage = $stages->shift(); + + while ($currentStage !== null) { + $this->progressBar()->setMessage($currentStage->title()); + $currentStage->handle($elasticsearch); + $this->progressBar()->advance($currentStage->advance()); + if ($currentStage->completed()) { + if ($stages->isEmpty()) { + /** @var null $currentStage */ + $currentStage = null; + } else { + /** @var StageInterface $currentStage */ + $currentStage = $stages->shift(); + } + } + } } + /** + * @return Collection + */ private function stages(): Collection { - return ImportStages::fromSource($this->source); + return ImportStages::fromSource($this->source, $this->parallel); } } diff --git a/src/Jobs/ImportStages.php b/src/Jobs/ImportStages.php index 78fe5632..9cbc3e63 100644 --- a/src/Jobs/ImportStages.php +++ b/src/Jobs/ImportStages.php @@ -5,22 +5,42 @@ use Illuminate\Support\Collection; use Matchish\ScoutElasticSearch\ElasticSearch\Index; use Matchish\ScoutElasticSearch\Jobs\Stages\CleanUp; +use Matchish\ScoutElasticSearch\Jobs\Stages\CleanUpTrackedJobs; use Matchish\ScoutElasticSearch\Jobs\Stages\CreateWriteIndex; use Matchish\ScoutElasticSearch\Jobs\Stages\PullFromSource; +use Matchish\ScoutElasticSearch\Jobs\Stages\PullFromSourceParallel; use Matchish\ScoutElasticSearch\Jobs\Stages\RefreshIndex; +use Matchish\ScoutElasticSearch\Jobs\Stages\StageInterface; +use Matchish\ScoutElasticSearch\Jobs\Stages\StopTrackedJobs; use Matchish\ScoutElasticSearch\Jobs\Stages\SwitchToNewAndRemoveOldIndex; use Matchish\ScoutElasticSearch\Searchable\ImportSource; +/** + * @extends Collection + */ class ImportStages extends Collection { /** * @param ImportSource $source - * @return Collection + * @param bool $parallel + * @return Collection */ - public static function fromSource(ImportSource $source) + public static function fromSource(ImportSource $source, bool $parallel = false) { $index = Index::fromSource($source); + if ($parallel && class_exists(\Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class)) { + return (new self([ + new StopTrackedJobs($source), + new CleanUp($source), + new CreateWriteIndex($source, $index), + PullFromSourceParallel::chunked($source), + new CleanUpTrackedJobs($source), + new RefreshIndex($index), + new SwitchToNewAndRemoveOldIndex($source, $index), + ]))->flatten()->filter(); + } + return (new self([ new CleanUp($source), new CreateWriteIndex($source, $index), diff --git a/src/Jobs/ProcessSearchable.php b/src/Jobs/ProcessSearchable.php new file mode 100644 index 00000000..98495f58 --- /dev/null +++ b/src/Jobs/ProcessSearchable.php @@ -0,0 +1,62 @@ + + */ + private $data; + + /** + * @param Collection $data + */ + public function __construct(Collection $data) + { + $this->__baseConstruct($data->first()); + + $this->trackedJob->update([ + 'trackable_type' => $data->first()->searchableAs(), + ]); + + $this->data = $data; + } + + /** + * Handles the job execution. + * + * @return void + */ + public function handle(): void + { + $this->trackedJob = $this->trackedJob->fresh(); + if ($this->trackedJob == null || $this->trackedJob->finished_at !== null) { + return; + } + + /** @var Model|Searchable $model */ + $model = $this->data->first(); + + /** @var \Laravel\Scout\Engines\Engine $engine */ + $engine = $model->searchableUsing(); + + $engine->update($this->data); + } +} diff --git a/src/Jobs/QueueableJob.php b/src/Jobs/QueueableJob.php index d799f7de..d000b61e 100644 --- a/src/Jobs/QueueableJob.php +++ b/src/Jobs/QueueableJob.php @@ -8,8 +8,7 @@ class QueueableJob implements ShouldQueue { - use Queueable; - use ProgressReportable; + use Queueable, ProgressReportable; public ?int $timeout = null; diff --git a/src/Jobs/Stages/CleanUp.php b/src/Jobs/Stages/CleanUp.php index da674c9d..238ee2dc 100644 --- a/src/Jobs/Stages/CleanUp.php +++ b/src/Jobs/Stages/CleanUp.php @@ -4,6 +4,7 @@ use Elastic\Elasticsearch\Client; use Elastic\Elasticsearch\Exception\ClientResponseException; +use Elastic\Elasticsearch\Response\Elasticsearch; use Matchish\ScoutElasticSearch\ElasticSearch\Params\Indices\Alias\Get as GetAliasParams; use Matchish\ScoutElasticSearch\ElasticSearch\Params\Indices\Delete as DeleteIndexParams; use Matchish\ScoutElasticSearch\Searchable\ImportSource; @@ -31,7 +32,9 @@ public function handle(Client $elasticsearch): void $source = $this->source; $params = GetAliasParams::anyIndex($source->searchableAs()); try { - $response = $elasticsearch->indices()->getAlias($params->toArray())->asArray(); + /** @var Elasticsearch $elasticResponse */ + $elasticResponse = $elasticsearch->indices()->getAlias($params->toArray()); + $response = $elasticResponse->asArray(); } catch (ClientResponseException $e) { $response = []; } @@ -55,4 +58,14 @@ public function estimate(): int { return 1; } + + public function advance(): int + { + return 1; + } + + public function completed(): bool + { + return true; + } } diff --git a/src/Jobs/Stages/CleanUpTrackedJobs.php b/src/Jobs/Stages/CleanUpTrackedJobs.php new file mode 100644 index 00000000..ae23c2c2 --- /dev/null +++ b/src/Jobs/Stages/CleanUpTrackedJobs.php @@ -0,0 +1,51 @@ +source = $source; + } + + public function handle(Client $elasticsearch): void + { + TrackedJob::where('trackable_type', $this->source->searchableAs())->delete(); + } + + public function title(): string + { + return 'Cleaning up tracked jobs records for this index.'; + } + + public function estimate(): int + { + return 1; + } + + public function advance(): int + { + return 1; + } + + public function completed(): bool + { + return true; + } +} diff --git a/src/Jobs/Stages/CreateWriteIndex.php b/src/Jobs/Stages/CreateWriteIndex.php index 1db3b48d..5ee111f9 100644 --- a/src/Jobs/Stages/CreateWriteIndex.php +++ b/src/Jobs/Stages/CreateWriteIndex.php @@ -61,4 +61,14 @@ public function estimate(): int { return 1; } + + public function advance(): int + { + return 1; + } + + public function completed(): bool + { + return true; + } } diff --git a/src/Jobs/Stages/PullFromSource.php b/src/Jobs/Stages/PullFromSource.php index 6e6a7c5e..6e8e3d13 100644 --- a/src/Jobs/Stages/PullFromSource.php +++ b/src/Jobs/Stages/PullFromSource.php @@ -3,7 +3,8 @@ namespace Matchish\ScoutElasticSearch\Jobs\Stages; use Elastic\Elasticsearch\Client; -use Illuminate\Support\Collection; +use Matchish\ScoutElasticSearch\Database\Scopes\FromScope; +use Matchish\ScoutElasticSearch\Database\Scopes\PageScope; use Matchish\ScoutElasticSearch\Searchable\ImportSource; /** @@ -16,6 +17,11 @@ final class PullFromSource implements StageInterface */ private $source; + /** + * @var int + */ + private $handledChunks = 0; + /** * @param ImportSource $source */ @@ -26,13 +32,24 @@ public function __construct(ImportSource $source) public function handle(Client $elasticsearch = null): void { + $this->handledChunks++; $results = $this->source->get()->filter->shouldBeSearchable(); if (! $results->isEmpty()) { $results->first()->searchableUsing()->update($results); + if ($results->first()->getKeyType() !== 'int') { + $this->source->setChunkScope(new PageScope($this->handledChunks, $this->source->getChunkSize())); + } else { + $this->source->setChunkScope(new FromScope($results->last()->getKey(), $this->source->getChunkSize())); + } } } public function estimate(): int + { + return $this->source->getTotalChunks() + 1; + } + + public function advance(): int { return 1; } @@ -42,14 +59,22 @@ public function title(): string return 'Indexing...'; } + public function completed(): bool + { + return ($this->handledChunks - 1) >= $this->source->getTotalChunks(); + } + /** * @param ImportSource $source - * @return Collection + * @return PullFromSource */ - public static function chunked(ImportSource $source): Collection + public static function chunked(ImportSource $source): ?PullFromSource { - return $source->chunked()->map(function ($chunk) { - return new static($chunk); - }); + $source = $source->chunked(); + if ($source === null) { + return null; + } + + return new static($source); } } diff --git a/src/Jobs/Stages/PullFromSourceParallel.php b/src/Jobs/Stages/PullFromSourceParallel.php new file mode 100644 index 00000000..c26cf14e --- /dev/null +++ b/src/Jobs/Stages/PullFromSourceParallel.php @@ -0,0 +1,169 @@ + + */ + private $queues = [ + 'import_1', + ]; + + /** + * @param ImportSource $source + */ + public function __construct(ImportSource $source) + { + $this->source = $source; + $this->queues = collect(config('scout.chunk.handlers', self::DEFAULT_HANDLER_COUNT))->map(function ($i) { + return 'import_'.$i; + })->toArray(); + } + + /** + * @return string + */ + private function getNextQueue(): string + { + /** @var string $queue */ + $queue = array_shift($this->queues); + $this->queues[] = $queue; + + return $queue; + } + + /** + * {@inheritdoc} + */ + public function handle(Client $elasticsearch = null): void + { + if (count($this->dispatchedJobIds) > 0) { + $jobs = TrackedJob::findMany($this->dispatchedJobIds); + $failedJobs = $jobs->filter(function ($job) { + return $job->status === TrackedJob::STATUS_FAILED; + }); + if ($failedJobs->isNotEmpty()) { + $jobs->each(function (TrackedJob $job) { + $job->markAsFailed(); + }); + throw new \Exception('Failed to process jobs: '.implode(', ', $failedJobs->pluck('id')->toArray())); + } + $finishedJobs = $jobs->filter(function ($job) { + return $job->status === TrackedJob::STATUS_FINISHED; + }); + $this->handledJobs = array_merge($this->handledJobs, $finishedJobs->pluck('id')->toArray()); + $this->advanceBy += $finishedJobs->count(); + $this->dispatchedJobIds = $jobs->filter(function ($job) { + return $job->status !== TrackedJob::STATUS_FINISHED; + })->pluck('id')->toArray(); + } + + if (count($this->dispatchedJobIds) >= $this->source->getTotalChunks()) { + return; + } + + $results = $this->source->get()->filter->shouldBeSearchable(); + + if (! $results->isEmpty()) { + $job = new ProcessSearchable($results); + dispatch($job)->onQueue($this->getNextQueue())->onConnection($this->source->syncWithSearchUsing()); + $this->dispatchedJobIds[] = $job->trackedJob->getKey(); + if ($results->first()->getKeyType() !== 'int') { + $this->source->setChunkScope( + new PageScope( + count($this->handledJobs) + count($this->dispatchedJobIds), + $this->source->getChunkSize()) + ); + } else { + $this->source->setChunkScope(new FromScope($results->last()->getKey(), $this->source->getChunkSize())); + } + } + } + + /** + * {@inheritdoc} + */ + public function estimate(): int + { + return $this->source->getTotalChunks(); + } + + /** + * {@inheritdoc} + */ + public function advance(): int + { + $advance = $this->advanceBy; + $this->advanceBy = 0; + + return $advance; + } + + /** + * {@inheritdoc} + */ + public function title(): string + { + return 'Indexing...'; + } + + /** + * {@inheritdoc} + */ + public function completed(): bool + { + return count($this->handledJobs) >= $this->source->getTotalChunks(); + } + + /** + * @param ImportSource $source + * @return PullFromSourceParallel|null + */ + public static function chunked(ImportSource $source): PullFromSourceParallel|null + { + $source = $source->chunked(); + + if ($source === null) { + return null; + } + + return new static($source); + } +} diff --git a/src/Jobs/Stages/RefreshIndex.php b/src/Jobs/Stages/RefreshIndex.php index 6a9e7aae..5ea4eac8 100644 --- a/src/Jobs/Stages/RefreshIndex.php +++ b/src/Jobs/Stages/RefreshIndex.php @@ -41,4 +41,14 @@ public function title(): string { return 'Refreshing index'; } + + public function advance(): int + { + return 1; + } + + public function completed(): bool + { + return true; + } } diff --git a/src/Jobs/Stages/StageInterface.php b/src/Jobs/Stages/StageInterface.php index e5cff307..04157214 100644 --- a/src/Jobs/Stages/StageInterface.php +++ b/src/Jobs/Stages/StageInterface.php @@ -6,9 +6,29 @@ interface StageInterface { + /** + * @return string + */ public function title(): string; + /** + * @return int + */ public function estimate(): int; + /** + * @return int + */ + public function advance(): int; + + /** + * @return bool + */ + public function completed(): bool; + + /** + * @param Client $elasticsearch + * @return void + */ public function handle(Client $elasticsearch): void; } diff --git a/src/Jobs/Stages/StopTrackedJobs.php b/src/Jobs/Stages/StopTrackedJobs.php new file mode 100644 index 00000000..1c22cf0f --- /dev/null +++ b/src/Jobs/Stages/StopTrackedJobs.php @@ -0,0 +1,55 @@ +source = $source; + } + + public function handle(Client $elasticsearch): void + { + TrackedJob::query() + ->where('trackable_type', $this->source->searchableAs()) + ->each(function (TrackedJob $job) { + $job->markAsFailed('New import started on the same index.'); + }); + } + + public function title(): string + { + return 'Stopping queued jobs for this index.'; + } + + public function estimate(): int + { + return 1; + } + + public function advance(): int + { + return 1; + } + + public function completed(): bool + { + return true; + } +} diff --git a/src/Jobs/Stages/SwitchToNewAndRemoveOldIndex.php b/src/Jobs/Stages/SwitchToNewAndRemoveOldIndex.php index eae35aac..da218d04 100644 --- a/src/Jobs/Stages/SwitchToNewAndRemoveOldIndex.php +++ b/src/Jobs/Stages/SwitchToNewAndRemoveOldIndex.php @@ -1,8 +1,11 @@ index = $index; } + /** + * {@inheritdoc} + */ public function handle(Client $elasticsearch): void { $source = $this->source; $params = Get::anyIndex($source->searchableAs()); - $response = $elasticsearch->indices()->getAlias($params->toArray())->asArray(); + /** @var Elasticsearch $elasticResponse */ + $elasticResponse = $elasticsearch->indices()->getAlias($params->toArray()); + $response = $elasticResponse->asArray(); $params = new Update(); foreach ($response as $indexName => $alias) { @@ -49,13 +58,35 @@ public function handle(Client $elasticsearch): void $elasticsearch->indices()->updateAliases($params->toArray()); } + /** + * {@inheritdoc} + */ public function estimate(): int { return 1; } + /** + * {@inheritdoc} + */ public function title(): string { return 'Switching to the new index'; } + + /** + * {@inheritdoc} + */ + public function advance(): int + { + return 1; + } + + /** + * {@inheritdoc} + */ + public function completed(): bool + { + return true; + } } diff --git a/src/Searchable/DefaultImportSource.php b/src/Searchable/DefaultImportSource.php index 0de226b9..c5943b70 100644 --- a/src/Searchable/DefaultImportSource.php +++ b/src/Searchable/DefaultImportSource.php @@ -1,78 +1,148 @@ */ private $scopes; + /** + * @var int + */ + private $totalChunks; + + /** + * @var int + */ + private $chunkSize; + + /** + * @var Scope|null + */ + private $chunkScope; + /** * DefaultImportSource constructor. * * @param string $className - * @param array $scopes + * @param array $scopes + * @param Scope|null $chunkScope */ - public function __construct(string $className, array $scopes = []) + public function __construct(string $className, array $scopes = [], Scope|null $chunkScope = null) { $this->className = $className; $this->scopes = $scopes; + $this->chunkScope = $chunkScope; + $this->chunkSize = self::DEFAULT_CHUNK_SIZE; + $this->totalChunks = 1; } + /** + * {@inheritdoc} + */ public function syncWithSearchUsingQueue(): ?string { return $this->model()->syncWithSearchUsingQueue(); } + /** + * {@inheritdoc} + */ public function syncWithSearchUsing(): ?string { return $this->model()->syncWithSearchUsing(); } + /** + * {@inheritdoc} + */ public function searchableAs(): string { return $this->model()->searchableAs(); } - public function chunked(): Collection + /** + * {@inheritdoc} + */ + public function getTotalChunks(): int + { + return $this->totalChunks; + } + + /** + * {@inheritdoc} + */ + public function getChunkSize(): int + { + return $this->chunkSize; + } + + /** + * {@inheritdoc} + */ + public function setChunkScope(Scope $scope): void + { + $this->chunkScope = $scope; + } + + /** + * {@inheritdoc} + */ + public function chunked(): ?ImportSource { $query = $this->newQuery(); $totalSearchables = $query->count(); if ($totalSearchables) { - $chunkSize = (int) config('scout.chunk.searchable', self::DEFAULT_CHUNK_SIZE); - $totalChunks = (int) ceil($totalSearchables / $chunkSize); - - return collect(range(1, $totalChunks))->map(function ($page) use ($chunkSize) { - $chunkScope = new PageScope($page, $chunkSize); - - return new static($this->className, array_merge($this->scopes, [$chunkScope])); - }); - } else { - return collect(); + $configChunkSize = config('scout.chunk.searchable', self::DEFAULT_CHUNK_SIZE); + $this->chunkSize = is_numeric($configChunkSize) ? intval($configChunkSize) : self::DEFAULT_CHUNK_SIZE; + $this->totalChunks = (int) ceil($totalSearchables / $this->chunkSize); + if ($this->model()->getKeyType() !== 'int') { + $this->chunkScope = new PageScope(0, $this->chunkSize); + } else { + $this->chunkScope = new FromScope(0, $this->chunkSize); + } + + return $this; } + + return null; } /** - * @return mixed + * @return Model|Searchable */ private function model() { + /** @var Model|Searchable */ return new $this->className; } + /** + * @return Builder<\Illuminate\Database\Eloquent\Model> + */ private function newQuery(): Builder { $query = $this->className::makeAllSearchableUsing($this->model()->newQuery()); @@ -82,8 +152,13 @@ private function newQuery(): Builder return $query->withTrashed(); }) ->orderBy($this->model()->getQualifiedKeyName()); + $scopes = $this->scopes; + if ($this->chunkScope) { + $scopes = array_merge($scopes, [$this->chunkScope]); + } + return collect($scopes)->reduce(function ($instance, $scope) { $instance->withGlobalScope(get_class($scope), $scope); @@ -91,9 +166,12 @@ private function newQuery(): Builder }, $query); } + /** + * {@inheritdoc} + */ public function get(): EloquentCollection { - /** @var EloquentCollection $models */ + /** @var EloquentCollection $models */ $models = $this->newQuery()->get(); return $models; diff --git a/src/Searchable/ImportSource.php b/src/Searchable/ImportSource.php index 874fdb53..405cc01b 100644 --- a/src/Searchable/ImportSource.php +++ b/src/Searchable/ImportSource.php @@ -1,19 +1,52 @@ + */ public function get(): EloquentCollection; } diff --git a/src/Searchable/SearchableListFactory.php b/src/Searchable/SearchableListFactory.php index ac34ffab..988f4beb 100644 --- a/src/Searchable/SearchableListFactory.php +++ b/src/Searchable/SearchableListFactory.php @@ -9,6 +9,7 @@ use Laravel\Scout\Searchable; use PhpParser\Error; use PhpParser\Node; +use PhpParser\Node\Name; use PhpParser\Node\Stmt\Class_; use PhpParser\NodeFinder; use PhpParser\NodeTraverser; @@ -61,7 +62,7 @@ public function getErrors(): array } /** - * @return Collection + * @return Collection */ public function make(): Collection { @@ -88,7 +89,7 @@ private function find(): array private function getSearchableClasses(): array { if (self::$searchableClasses === null) { - self::$searchableClasses = $this->getProjectClasses()->filter(function ($class) { + self::$searchableClasses = $this->getProjectClasses()->filter(function (string $class) { return $this->findSearchableTraitRecursively($class); })->toArray(); } @@ -97,7 +98,7 @@ private function getSearchableClasses(): array } /** - * @return Collection + * @return Collection */ private function getProjectClasses(): Collection { @@ -106,8 +107,11 @@ private function getProjectClasses(): Collection return $node instanceof Class_; }); - return Collection::make($nodes)->map(function ($node) { - return $node->namespacedName->toCodeString(); + return Collection::make($nodes)->map(function (Class_ $node) { + $namespace = $node->namespacedName; + if ($namespace instanceof Name) { + return $namespace->toCodeString(); + } }); } @@ -131,6 +135,7 @@ private function getStmts(): array $stmts = Collection::make($stmts)->flatten(1)->toArray(); + /** @var \PhpParser\Node[] $stmts */ return $nodeTraverser->traverse($stmts); } diff --git a/tests/Feature/ImportCommandTest.php b/tests/Feature/ImportCommandTest.php index 32dd00ff..f90285f7 100644 --- a/tests/Feature/ImportCommandTest.php +++ b/tests/Feature/ImportCommandTest.php @@ -72,6 +72,33 @@ public function test_import_entites_in_queue(): void $this->assertEquals($productsAmount, $response['hits']['total']['value']); } + public function test_import_entites_in_parallel(): void + { + $this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']); + + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = random_int(1, 5); + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + + Artisan::call('scout:import', [ + '--parallel' => true, + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals($productsAmount, $response['hits']['total']['value']); + } + public function test_import_all_pages(): void { $dispatcher = Product::getEventDispatcher(); @@ -96,20 +123,78 @@ public function test_import_all_pages(): void $this->assertEquals($productsAmount, $response['hits']['total']['value']); } + public function test_import_all_pages_in_parallel(): void + { + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = 10; + + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + + Artisan::call('scout:import', [ + '--parallel' => true, + ]); + $params = [ + 'index' => (new Product())->searchableAs(), + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals($productsAmount, $response['hits']['total']['value']); + } + public function test_import_with_custom_key_all_pages(): void { $this->app['config']['scout.key'] = 'title'; - $dispatcher = Book::getEventDispatcher(); - Book::unsetEventDispatcher(); + $dispatcher = BookWithCustomKey::getEventDispatcher(); + BookWithCustomKey::unsetEventDispatcher(); $booksAmount = 10; - factory(Book::class, $booksAmount)->create(); + factory(BookWithCustomKey::class, $booksAmount)->create(); - Book::setEventDispatcher($dispatcher); + BookWithCustomKey::setEventDispatcher($dispatcher); - Artisan::call('scout:import'); + Artisan::call('scout:import', [ + 'searchable' => BookWithCustomKey::class, + ]); + $params = [ + 'index' => (new BookWithCustomKey())->searchableAs(), + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals($booksAmount, $response['hits']['total']['value']); + } + + public function test_import_with_custom_key_all_pages_in_parallel(): void + { + $this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']); + $this->app['config']['scout.key'] = 'title'; + + $dispatcher = BookWithCustomKey::getEventDispatcher(); + BookWithCustomKey::unsetEventDispatcher(); + + $booksAmount = 10; + + factory(BookWithCustomKey::class, $booksAmount)->create(); + + BookWithCustomKey::setEventDispatcher($dispatcher); + + Artisan::call('scout:import', [ + 'searchable' => BookWithCustomKey::class, + '--parallel' => true, + ]); $params = [ 'index' => (new BookWithCustomKey())->searchableAs(), 'body' => [ @@ -149,6 +234,35 @@ public function test_remove_old_index_after_switching_to_new(): void $this->assertFalse($this->elasticsearch->indices()->exists(['index' => 'products_old'])->asBool(), 'Old index must be deleted'); } + public function test_remove_old_index_after_switching_to_new_in_parallel_import(): void + { + $params = [ + 'index' => 'products_old', + 'body' => [ + 'aliases' => ['products' => new stdClass()], + 'settings' => [ + 'number_of_shards' => 1, + 'number_of_replicas' => 0, + ], + ], + ]; + $this->elasticsearch->indices()->create($params); + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = random_int(1, 5); + + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + + Artisan::call('scout:import', [ + '--parallel' => true, + ]); + + $this->assertFalse($this->elasticsearch->indices()->exists(['index' => 'products_old'])->asBool(), 'Old index must be deleted'); + } + public function test_progress_report(): void { $output = new BufferedOutput(); @@ -186,6 +300,21 @@ public function test_progress_report_in_queue(): void $this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output); } + public function test_progress_report_in_parallel(): void + { + $this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']); + + $output = new BufferedOutput(); + Artisan::call('scout:import', [ + '--parallel' => true, + ], $output); + + $output = array_map('trim', explode("\n", $output->fetch())); + + $this->assertContains(trans('scout::import.start', ['searchable' => Product::class]), $output); + $this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output); + } + public function test_queue_timeout_configuration(): void { Bus::fake([ diff --git a/tests/Integration/Engines/ElasticSearchEngineTest.php b/tests/Integration/Engines/ElasticSearchEngineTest.php index f208e7f5..c21e6514 100644 --- a/tests/Integration/Engines/ElasticSearchEngineTest.php +++ b/tests/Integration/Engines/ElasticSearchEngineTest.php @@ -170,6 +170,20 @@ public function test_lazy_map_for_mixed_search(): void }); } + public function test_create_index(): void + { + $this->expectException(\Error::class); + $this->expectExceptionMessage('Not implemented'); + $this->engine->createIndex('products'); + } + + public function test_delete_index(): void + { + $this->expectException(\Error::class); + $this->expectExceptionMessage('Not implemented'); + $this->engine->deleteIndex('products'); + } + private function refreshIndex(string $index): void { $params = [ diff --git a/tests/Integration/Jobs/ImportTest.php b/tests/Integration/Jobs/ImportTest.php index 5e5789a4..161f2fd0 100644 --- a/tests/Integration/Jobs/ImportTest.php +++ b/tests/Integration/Jobs/ImportTest.php @@ -33,13 +33,49 @@ public function test_progress_report() dispatch($job); $this->assertEquals([ - 'Clean up 1/7', - 'Create write index 2/7', - 'Indexing... 3/7', - 'Indexing... 4/7', - 'Indexing... 5/7', - 'Refreshing index 6/7', - 'Switching to the new index 7/7', + 'Clean up 1/8', + 'Create write index 2/8', + 'Indexing... 3/8', + 'Indexing... 4/8', + 'Indexing... 5/8', + 'Indexing... 6/8', + 'Refreshing index 7/8', + 'Switching to the new index 8/8', + ], $output->getLogs()); + } + + public function test_progress_report_parallel() + { + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + $productsAmount = 7; + factory(Product::class, $productsAmount)->create(); + Product::setEventDispatcher($dispatcher); + + $job = new Import(DefaultImportSourceFactory::from(Product::class)); + $job->parallel = true; + $output = new DummyOutput(); + $outputStyle = new OutputStyle(new ArrayInput([]), $output); + $progressBar = $outputStyle->createProgressBar(); + $progressBar->setRedrawFrequency(1); + $progressBar->maxSecondsBetweenRedraws(0); + $progressBar->minSecondsBetweenRedraws(0); + $progressBar->setFormat('[%message%] %current%/%max%'); + $job->withProgressReport($progressBar); + + dispatch($job); + + $this->assertEquals([ + 'Stopping queued jobs for this index. 1/9', + 'Clean up 2/9', + 'Create write index 3/9', + 'Indexing... 3/9', + 'Indexing... 4/9', + 'Indexing... 5/9', + 'Indexing... 6/9', + 'Cleaning up tracked jobs records for this index. 7/9', + 'Refreshing index 8/9', + 'Switching to the new index 9/9', ], $output->getLogs()); } } diff --git a/tests/Integration/Jobs/Stages/PullFromSourceParallelTest.php b/tests/Integration/Jobs/Stages/PullFromSourceParallelTest.php new file mode 100644 index 00000000..3c135235 --- /dev/null +++ b/tests/Integration/Jobs/Stages/PullFromSourceParallelTest.php @@ -0,0 +1,204 @@ +create(); + + Product::setEventDispatcher($dispatcher); + $this->elasticsearch->indices()->create([ + 'index' => 'products_index', + 'body' => ['aliases' => ['products' => new stdClass()]], + ]); + $stage = new PullFromSourceParallel(DefaultImportSourceFactory::from(Product::class)); + $stage->handle(); + $this->elasticsearch->indices()->refresh([ + 'index' => 'products', + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals($productsAmount, $response['hits']['total']['value']); + } + + public function test_dont_put_entities_if_no_entities_in_collection(): void + { + $this->elasticsearch->indices()->create([ + 'index' => 'products_index', + 'body' => ['aliases' => ['products' => new stdClass()]], + ]); + $stage = new PullFromSourceParallel(DefaultImportSourceFactory::from(Product::class)); + $stage->handle(); + $this->elasticsearch->indices()->refresh([ + 'index' => 'products', + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals(0, $response['hits']['total']['value']); + } + + public function test_put_all_to_index_if_amount_of_entities_more_than_chunk_size(): void + { + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = 20; + $this->app['config']->set('scout.chunk.searchable', 5); + + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + $this->elasticsearch->indices()->create([ + 'index' => 'products_index', + 'body' => ['aliases' => ['products' => new stdClass()]], + ]); + $stage = new PullFromSourceParallel(DefaultImportSourceFactory::from(Product::class)); + $stage->handle(); + $this->elasticsearch->indices()->refresh([ + 'index' => 'products', + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals($productsAmount, $response['hits']['total']['value']); + } + + public function test_pull_soft_delete_meta_data() + { + $this->app['config']['scout.soft_delete'] = true; + + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = rand(1, 5); + + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + $this->elasticsearch->indices()->create([ + 'index' => 'products_index', + 'body' => ['aliases' => ['products' => new stdClass()]], + ]); + $stage = new PullFromSourceParallel(DefaultImportSourceFactory::from(Product::class)); + $stage->handle(); + $this->elasticsearch->indices()->refresh([ + 'index' => 'products', + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals(0, $response['hits']['hits'][0]['_source']['__soft_deleted']); + } + + public function test_pull_soft_deleted() + { + $this->app['config']['scout.soft_delete'] = true; + + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = 3; + + factory(Product::class, $productsAmount)->create(); + + Product::limit(1)->get()->first()->delete(); + + Product::setEventDispatcher($dispatcher); + $this->elasticsearch->indices()->create([ + 'index' => 'products_index', + 'body' => ['aliases' => ['products' => new stdClass()]], + ]); + $stage = PullFromSourceParallel::chunked(DefaultImportSourceFactory::from(Product::class)); + $stage->handle(); + $this->elasticsearch->indices()->refresh([ + 'index' => 'products', + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals(3, $response['hits']['total']['value']); + } + + public function test_no_searchables_no_chunks() + { + $stage = PullFromSourceParallel::chunked(DefaultImportSourceFactory::from(Product::class)); + + $this->assertEquals(null, $stage); + } + + public function test_chunked_pull_only_one_page() + { + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = 5; + + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + + $chunks = PullFromSourceParallel::chunked(DefaultImportSourceFactory::from(Product::class)); + $chunks->handle(); + $this->elasticsearch->indices()->refresh([ + 'index' => 'products', + ]); + $params = [ + 'index' => 'products', + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + + $this->assertEquals(3, $response['hits']['total']['value']); + } +} diff --git a/tests/Integration/Jobs/Stages/PullFromSourceTest.php b/tests/Integration/Jobs/Stages/PullFromSourceTest.php index d99f7bb6..b9f1af79 100644 --- a/tests/Integration/Jobs/Stages/PullFromSourceTest.php +++ b/tests/Integration/Jobs/Stages/PullFromSourceTest.php @@ -41,6 +41,7 @@ public function test_put_all_entites_to_index(): void ]; $response = $this->elasticsearch->search($params); $this->assertEquals($productsAmount, $response['hits']['total']['value']); + $this->assertDatabaseEmpty('tracked_jobs'); } public function test_dont_put_entities_if_no_entities_in_collection(): void @@ -149,8 +150,8 @@ public function test_pull_soft_deleted() 'index' => 'products_index', 'body' => ['aliases' => ['products' => new stdClass()]], ]); - $stages = PullFromSource::chunked(DefaultImportSourceFactory::from(Product::class)); - $stages->first()->handle(); + $stage = PullFromSource::chunked(DefaultImportSourceFactory::from(Product::class)); + $stage->handle(); $this->elasticsearch->indices()->refresh([ 'index' => 'products', ]); @@ -168,9 +169,9 @@ public function test_pull_soft_deleted() public function test_no_searchables_no_chunks() { - $stages = PullFromSource::chunked(DefaultImportSourceFactory::from(Product::class)); + $stage = PullFromSource::chunked(DefaultImportSourceFactory::from(Product::class)); - $this->assertEquals(0, $stages->count()); + $this->assertEquals(null, $stage); } public function test_chunked_pull_only_one_page() @@ -185,7 +186,7 @@ public function test_chunked_pull_only_one_page() Product::setEventDispatcher($dispatcher); $chunks = PullFromSource::chunked(DefaultImportSourceFactory::from(Product::class)); - $chunks->first()->handle(); + $chunks->handle(); $this->elasticsearch->indices()->refresh([ 'index' => 'products', ]); diff --git a/tests/Integration/Searchable/DefaultImportSourceTest.php b/tests/Integration/Searchable/DefaultImportSourceTest.php index caa8eaf4..94d283da 100644 --- a/tests/Integration/Searchable/DefaultImportSourceTest.php +++ b/tests/Integration/Searchable/DefaultImportSourceTest.php @@ -27,6 +27,23 @@ public function test_new_query_has_injected_scopes() $products = $source->get(); $this->assertEquals($iphonePromoUsedAmount, $products->count()); } + + public function test_new_query_has_injected_chunk_scope() + { + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $iphonePromoUsedAmount = rand(1, 5); + $iphonePromoNewAmount = rand(6, 10); + + factory(Product::class, $iphonePromoUsedAmount)->states(['iphone', 'promo', 'used'])->create(); + factory(Product::class, $iphonePromoNewAmount)->states(['iphone', 'promo', 'new'])->create(); + + Product::setEventDispatcher($dispatcher); + $source = new DefaultImportSource(Product::class, [], new UsedScope()); + $products = $source->get(); + $this->assertEquals($iphonePromoUsedAmount, $products->count()); + } } class UsedScope implements Scope diff --git a/tests/IntegrationTestCase.php b/tests/IntegrationTestCase.php index d8e78e21..f4bc4249 100644 --- a/tests/IntegrationTestCase.php +++ b/tests/IntegrationTestCase.php @@ -32,6 +32,9 @@ protected function getEnvironmentSetUp($app) parent::getEnvironmentSetUp($app); $app['config']->set('elasticsearch', require(__DIR__.'/../config/elasticsearch.php')); + if (class_exists(\Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class)) { + $app['config']->set('trackable-job', require(__DIR__.'/../vendor/mateusjunges/laravel-trackable-jobs/config/trackable-jobs.php')); + } $app['config']->set('elasticsearch.indices.mappings.products', [ 'properties' => [ 'type' => [ diff --git a/tests/TestCase.php b/tests/TestCase.php index d0fb948f..6d4b5e5f 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -4,7 +4,6 @@ namespace Tests; -use Illuminate\Support\Facades\Artisan; use Laravel\Scout\ScoutServiceProvider; use Matchish\ScoutElasticSearch\ElasticSearchServiceProvider; use Matchish\ScoutElasticSearch\Engines\ElasticSearchEngine; @@ -21,7 +20,13 @@ public function setUp(): void $this->withFactories(database_path('factories')); - Artisan::call('migrate:fresh', ['--database' => 'mysql']); + if (class_exists(\Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class)) { + \Artisan::call('vendor:publish', [ + '--tag' => 'trackable-jobs-config', + ]); + } + + \Artisan::call('migrate:fresh', ['--database' => 'mysql']); } /** @@ -34,6 +39,7 @@ protected function getEnvironmentSetUp($app) { $app['config']->set('scout.driver', ElasticSearchEngine::class); $app['config']->set('scout.chunk.searchable', 3); + $app['config']->set('scout.chunk.handlers', 1); $app['config']->set('scout.queue', false); // Setup default database to use sqlite :memory: $app['config']->set('database.default', 'mysql'); @@ -41,10 +47,17 @@ protected function getEnvironmentSetUp($app) protected function getPackageProviders($app) { - return [ + /** @var array $providers */ + $providers = [ ScoutServiceProvider::class, ScoutElasticSearchServiceProvider::class, ElasticSearchServiceProvider::class, ]; + + if (class_exists(\Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class)) { + $providers[] = \Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class; + } + + return $providers; } } diff --git a/tests/Unit/ElasticSearch/Config/ConfigTest.php b/tests/Unit/ElasticSearch/Config/ConfigTest.php index d45ce9e7..acac7723 100644 --- a/tests/Unit/ElasticSearch/Config/ConfigTest.php +++ b/tests/Unit/ElasticSearch/Config/ConfigTest.php @@ -14,6 +14,10 @@ public function test_parse_host(): void $config = new ScoutConfig(); $this->assertEquals(['http://localhost:9200'], $config::hosts()); + + Config::set('elasticsearch.host', 9001); + $config = new ScoutConfig(); + $this->assertEquals([], $config::hosts()); } public function test_parse_multihost(): void diff --git a/tests/Unit/Jobs/ImportStagesTest.php b/tests/Unit/Jobs/ImportStagesTest.php index cd055c71..94fdc528 100644 --- a/tests/Unit/Jobs/ImportStagesTest.php +++ b/tests/Unit/Jobs/ImportStagesTest.php @@ -20,22 +20,19 @@ public function test_no_pull_stages_if_no_searchables() $this->assertEquals(4, $stages->count()); $this->assertInstanceOf(CleanUp::class, $stages->get(0)); $this->assertInstanceOf(CreateWriteIndex::class, $stages->get(1)); - $this->assertInstanceOf(RefreshIndex::class, $stages->get(2)); - $this->assertInstanceOf(SwitchToNewAndRemoveOldIndex::class, $stages->get(3)); + $this->assertInstanceOf(RefreshIndex::class, $stages->get(3)); + $this->assertInstanceOf(SwitchToNewAndRemoveOldIndex::class, $stages->get(4)); } public function test_stages() { factory(Product::class, 10)->create(); $stages = ImportStages::fromSource(DefaultImportSourceFactory::from(Product::class)); - $this->assertEquals(8, $stages->count()); + $this->assertEquals(5, $stages->count()); $this->assertInstanceOf(CleanUp::class, $stages->get(0)); $this->assertInstanceOf(CreateWriteIndex::class, $stages->get(1)); $this->assertInstanceOf(PullFromSource::class, $stages->get(2)); - $this->assertInstanceOf(PullFromSource::class, $stages->get(3)); - $this->assertInstanceOf(PullFromSource::class, $stages->get(4)); - $this->assertInstanceOf(PullFromSource::class, $stages->get(5)); - $this->assertInstanceOf(RefreshIndex::class, $stages->get(6)); - $this->assertInstanceOf(SwitchToNewAndRemoveOldIndex::class, $stages->get(7)); + $this->assertInstanceOf(RefreshIndex::class, $stages->get(3)); + $this->assertInstanceOf(SwitchToNewAndRemoveOldIndex::class, $stages->get(4)); } } diff --git a/tests/Unit/Searchable/SearchableListFactoryTest.php b/tests/Unit/Searchable/SearchableListFactoryTest.php index 16fa3f97..66653afa 100644 --- a/tests/Unit/Searchable/SearchableListFactoryTest.php +++ b/tests/Unit/Searchable/SearchableListFactoryTest.php @@ -20,6 +20,7 @@ public function test_only_load_seachable_classes() // There are 5 searchable models: Book, BookWithCustomKey, Product, Ticket and Post $this->assertCount(5, $searchable); + $this->assertIsArray($factory->getErrors()); } public function test_find_searchable_trait_within_trait() diff --git a/tests/laravel/app/BookWithCustomKey.php b/tests/laravel/app/BookWithCustomKey.php index 5e5f5c09..eec33d88 100644 --- a/tests/laravel/app/BookWithCustomKey.php +++ b/tests/laravel/app/BookWithCustomKey.php @@ -13,6 +13,11 @@ class BookWithCustomKey extends Book 'year', ]; + public function getKeyType() + { + return 'string'; + } + public function getKeyName() { return 'custom_key'; diff --git a/tests/laravel/app/Library/CustomHitsIteratorAggregate.php b/tests/laravel/app/Library/CustomHitsIteratorAggregate.php index 99bc2ee5..6626a11c 100644 --- a/tests/laravel/app/Library/CustomHitsIteratorAggregate.php +++ b/tests/laravel/app/Library/CustomHitsIteratorAggregate.php @@ -17,7 +17,7 @@ public function __construct(array $results, callable $callback = null) $this->callback = $callback; } - public function getIterator() + public function getIterator(): \Traversable { $hits = ['test1', 'test2', 'test3']; diff --git a/tests/laravel/database/factories/BookFactory.php b/tests/laravel/database/factories/BookFactory.php index 6b3882f7..f0ce4291 100644 --- a/tests/laravel/database/factories/BookFactory.php +++ b/tests/laravel/database/factories/BookFactory.php @@ -3,6 +3,7 @@ declare(strict_types=1); use App\Book; +use App\BookWithCustomKey; use Faker\Generator as Faker; $factory->define(Book::class, function (Faker $faker) { @@ -25,3 +26,24 @@ 'title' => "{$faker->sentence} Barcelona {$faker->sentence}", ]; }); + +$factory->define(BookWithCustomKey::class, function (Faker $faker) { + return [ + 'custom_key' => $faker->uuid, + 'title' => $faker->sentence, + 'author' => $faker->name, + 'year' => $faker->year, + ]; +}); + +$factory->state(BookWithCustomKey::class, 'new-york', function (Faker $faker) { + return [ + 'title' => "{$faker->sentence} New-York {$faker->sentence}", + ]; +}); + +$factory->state(BookWithCustomKey::class, 'barcelona', function (Faker $faker) { + return [ + 'title' => "{$faker->sentence} Barcelona {$faker->sentence}", + ]; +}); diff --git a/tests/laravel/database/migrations/2019_15_02_000002_create_tracked_jobs_table.php b/tests/laravel/database/migrations/2019_15_02_000002_create_tracked_jobs_table.php new file mode 100644 index 00000000..b9c6e078 --- /dev/null +++ b/tests/laravel/database/migrations/2019_15_02_000002_create_tracked_jobs_table.php @@ -0,0 +1,39 @@ +table_name = config('trackable-jobs.tables.tracked_jobs', 'tracked_jobs'); + $this->usingUuid = config('trackable-jobs.using_uuid', false); + } + + public function up(): void + { + Schema::create($this->table_name, function (Blueprint $table) { + $this->usingUuid + ? $table->uuid('uuid')->primary() + : $table->id(); + $table->string('trackable_id')->index(); + $table->string('trackable_type')->index(); + $table->string('name'); + $table->string('status')->default('queued'); + $table->longText('output')->nullable(); + $table->timestamp('started_at')->nullable(); + $table->timestamp('finished_at')->nullable(); + $table->timestamps(); + }); + } + + public function down(): void + { + Schema::dropIfExists($this->table_name); + } +};