diff --git a/config/elasticsearch.php b/config/elasticsearch.php index 58c3a91b..4f5a5273 100644 --- a/config/elasticsearch.php +++ b/config/elasticsearch.php @@ -6,6 +6,9 @@ 'password' => env('ELASTICSEARCH_PASSWORD'), 'cloud_id' => env('ELASTICSEARCH_CLOUD_ID'), 'api_key' => env('ELASTICSEARCH_API_KEY'), + 'queue' => [ + 'timeout' => env('SCOUT_QUEUE_TIMEOUT'), + ], 'indices' => [ 'mappings' => [ 'default' => [ diff --git a/src/Console/Commands/ImportCommand.php b/src/Console/Commands/ImportCommand.php index 18c8656a..fbdf0675 100644 --- a/src/Console/Commands/ImportCommand.php +++ b/src/Console/Commands/ImportCommand.php @@ -6,6 +6,7 @@ use Illuminate\Console\Command; use Illuminate\Support\Collection; +use Matchish\ScoutElasticSearch\ElasticSearch\Config\Config; use Matchish\ScoutElasticSearch\Jobs\Import; use Matchish\ScoutElasticSearch\Jobs\QueueableJob; use Matchish\ScoutElasticSearch\Searchable\ImportSource; @@ -48,9 +49,11 @@ private function import(string $searchable): void $sourceFactory = app(ImportSourceFactory::class); $source = $sourceFactory::from($searchable); $job = new Import($source); + $job->timeout = Config::queueTimeout(); if (config('scout.queue')) { $job = (new QueueableJob())->chain([$job]); + $job->timeout = Config::queueTimeout(); } $bar = (new ProgressBarFactory($this->output))->create(); diff --git a/src/ElasticSearch/Config/Config.php b/src/ElasticSearch/Config/Config.php index f892eefd..93817fed 100644 --- a/src/ElasticSearch/Config/Config.php +++ b/src/ElasticSearch/Config/Config.php @@ -8,6 +8,7 @@ * @method static password() * @method static elasticCloudId() * @method static apiKey() + * @method static queueTimeout() */ class Config { diff --git a/src/ElasticSearch/Config/Storage.php b/src/ElasticSearch/Config/Storage.php index d583c5c9..87abf2d4 100644 --- a/src/ElasticSearch/Config/Storage.php +++ b/src/ElasticSearch/Config/Storage.php @@ -63,6 +63,14 @@ public function apiKey(): ?string return $this->loadConfig('api_key'); } + /** + * @return ?int + */ + public function queueTimeout(): ?int + { + return (int) $this->loadConfig('queue.timeout') ?: null; + } + /** * @param string $path * @return mixed diff --git a/src/Jobs/Import.php b/src/Jobs/Import.php index 7aa440f0..92c76045 100644 --- a/src/Jobs/Import.php +++ b/src/Jobs/Import.php @@ -21,6 +21,8 @@ final class Import */ private $source; + public ?int $timeout = null; + /** * @param ImportSource $source */ diff --git a/src/Jobs/QueueableJob.php b/src/Jobs/QueueableJob.php index 027ab896..d799f7de 100644 --- a/src/Jobs/QueueableJob.php +++ b/src/Jobs/QueueableJob.php @@ -11,6 +11,8 @@ class QueueableJob implements ShouldQueue use Queueable; use ProgressReportable; + public ?int $timeout = null; + public function handle(): void { } diff --git a/tests/Feature/ImportCommandTest.php b/tests/Feature/ImportCommandTest.php index 9b0c9147..f973ab96 100644 --- a/tests/Feature/ImportCommandTest.php +++ b/tests/Feature/ImportCommandTest.php @@ -8,6 +8,9 @@ use App\BookWithCustomKey; use App\Product; use Illuminate\Support\Facades\Artisan; +use Illuminate\Support\Facades\Bus; +use Matchish\ScoutElasticSearch\Jobs\Import; +use Matchish\ScoutElasticSearch\Jobs\QueueableJob; use stdClass; use Symfony\Component\Console\Output\BufferedOutput; use Tests\IntegrationTestCase; @@ -19,11 +22,11 @@ public function test_import_entites(): void $dispatcher = Product::getEventDispatcher(); Product::unsetEventDispatcher(); - $productsAmount = rand(1, 5); + $productsAmount = random_int(1, 5); factory(Product::class, $productsAmount)->create(); - $productsUnsearchableAmount = rand(1, 5); + $productsUnsearchableAmount = random_int(1, 5); factory(Product::class, $productsUnsearchableAmount)->states(['archive'])->create(); Product::setEventDispatcher($dispatcher); @@ -50,7 +53,7 @@ public function test_import_entites_in_queue(): void $dispatcher = Product::getEventDispatcher(); Product::unsetEventDispatcher(); - $productsAmount = rand(1, 5); + $productsAmount = random_int(1, 5); factory(Product::class, $productsAmount)->create(); Product::setEventDispatcher($dispatcher); @@ -134,7 +137,7 @@ public function test_remove_old_index_after_switching_to_new(): void $dispatcher = Product::getEventDispatcher(); Product::unsetEventDispatcher(); - $productsAmount = rand(1, 5); + $productsAmount = random_int(1, 5); factory(Product::class, $productsAmount)->create(); @@ -145,7 +148,7 @@ public function test_remove_old_index_after_switching_to_new(): void $this->assertFalse($this->elasticsearch->indices()->exists(['index' => 'products_old'])->asBool(), 'Old index must be deleted'); } - public function test_progress_report() + public function test_progress_report(): void { $output = new BufferedOutput(); Artisan::call('scout:import', ['searchable' => [Product::class, Book::class]], $output); @@ -165,7 +168,7 @@ public function test_progress_report() trim($output[30])); } - public function test_progress_report_in_queue() + public function test_progress_report_in_queue(): void { $this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']); @@ -177,4 +180,92 @@ public function test_progress_report_in_queue() $this->assertContains(trans('scout::import.start', ['searchable' => Product::class]), $output); $this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output); } + + public function test_queue_timeout_configuration(): void + { + Bus::fake([ + QueueableJob::class, + ]); + + $this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']); + $this->app['config']->set('elasticsearch.queue.timeout', 2); + + $output = new BufferedOutput(); + Artisan::call('scout:import', [], $output); + + $output = array_map('trim', explode("\n", $output->fetch())); + + $this->assertContains(trans('scout::import.start', ['searchable' => Product::class]), $output); + $this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output); + + Bus::assertDispatched(function (QueueableJob $job) { + return $job->timeout === 2; + }); + } + + public function test_chained_queue_timeout_configuration(): void + { + Bus::fake([ + Import::class, + ]); + + $this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']); + $this->app['config']->set('elasticsearch.queue.timeout', 2); + + $output = new BufferedOutput(); + Artisan::call('scout:import', [], $output); + + $output = array_map('trim', explode("\n", $output->fetch())); + + $this->assertContains(trans('scout::import.start', ['searchable' => Product::class]), $output); + $this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output); + + Bus::assertDispatched(function (Import $job) { + return $job->timeout === 2; + }); + } + + public function test_chained_queue_timeout_configuration_with_null_value(): void + { + Bus::fake([ + Import::class, + ]); + + $this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']); + $this->app['config']->set('elasticsearch.queue.timeout', null); + + $output = new BufferedOutput(); + Artisan::call('scout:import', [], $output); + + $output = array_map('trim', explode("\n", $output->fetch())); + + $this->assertContains(trans('scout::import.start', ['searchable' => Product::class]), $output); + $this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output); + + Bus::assertDispatched(function (Import $job) { + return $job->timeout === null; + }); + } + + public function test_chained_queue_timeout_configuration_with_empty_string(): void + { + Bus::fake([ + Import::class, + ]); + + $this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']); + $this->app['config']->set('elasticsearch.queue.timeout', ''); + + $output = new BufferedOutput(); + Artisan::call('scout:import', [], $output); + + $output = array_map('trim', explode("\n", $output->fetch())); + + $this->assertContains(trans('scout::import.start', ['searchable' => Product::class]), $output); + $this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output); + + Bus::assertDispatched(function (Import $job) { + return $job->timeout === null; + }); + } }