Skip to content

Commit

Permalink
Merge pull request #230 from hkulekci/queue-timeout-configuration
Browse files Browse the repository at this point in the history
a config initialized for queue timeout
  • Loading branch information
matchish authored Feb 17, 2023
2 parents d8b0737 + ea3e2f4 commit 7b9fc92
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 6 deletions.
3 changes: 3 additions & 0 deletions config/elasticsearch.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
'password' => env('ELASTICSEARCH_PASSWORD'),
'cloud_id' => env('ELASTICSEARCH_CLOUD_ID'),
'api_key' => env('ELASTICSEARCH_API_KEY'),
'queue' => [
'timeout' => env('SCOUT_QUEUE_TIMEOUT'),
],
'indices' => [
'mappings' => [
'default' => [
Expand Down
3 changes: 3 additions & 0 deletions src/Console/Commands/ImportCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Illuminate\Console\Command;
use Illuminate\Support\Collection;
use Matchish\ScoutElasticSearch\ElasticSearch\Config\Config;
use Matchish\ScoutElasticSearch\Jobs\Import;
use Matchish\ScoutElasticSearch\Jobs\QueueableJob;
use Matchish\ScoutElasticSearch\Searchable\ImportSource;
Expand Down Expand Up @@ -48,9 +49,11 @@ private function import(string $searchable): void
$sourceFactory = app(ImportSourceFactory::class);
$source = $sourceFactory::from($searchable);
$job = new Import($source);
$job->timeout = Config::queueTimeout();

if (config('scout.queue')) {
$job = (new QueueableJob())->chain([$job]);
$job->timeout = Config::queueTimeout();
}

$bar = (new ProgressBarFactory($this->output))->create();
Expand Down
1 change: 1 addition & 0 deletions src/ElasticSearch/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* @method static password()
* @method static elasticCloudId()
* @method static apiKey()
* @method static queueTimeout()
*/
class Config
{
Expand Down
8 changes: 8 additions & 0 deletions src/ElasticSearch/Config/Storage.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ public function apiKey(): ?string
return $this->loadConfig('api_key');
}

/**
* @return ?int
*/
public function queueTimeout(): ?int
{
return (int) $this->loadConfig('queue.timeout') ?: null;
}

/**
* @param string $path
* @return mixed
Expand Down
2 changes: 2 additions & 0 deletions src/Jobs/Import.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ final class Import
*/
private $source;

public ?int $timeout = null;

/**
* @param ImportSource $source
*/
Expand Down
2 changes: 2 additions & 0 deletions src/Jobs/QueueableJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ class QueueableJob implements ShouldQueue
use Queueable;
use ProgressReportable;

public ?int $timeout = null;

public function handle(): void
{
}
Expand Down
103 changes: 97 additions & 6 deletions tests/Feature/ImportCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
use App\BookWithCustomKey;
use App\Product;
use Illuminate\Support\Facades\Artisan;
use Illuminate\Support\Facades\Bus;
use Matchish\ScoutElasticSearch\Jobs\Import;
use Matchish\ScoutElasticSearch\Jobs\QueueableJob;
use stdClass;
use Symfony\Component\Console\Output\BufferedOutput;
use Tests\IntegrationTestCase;
Expand All @@ -19,11 +22,11 @@ public function test_import_entites(): void
$dispatcher = Product::getEventDispatcher();
Product::unsetEventDispatcher();

$productsAmount = rand(1, 5);
$productsAmount = random_int(1, 5);

factory(Product::class, $productsAmount)->create();

$productsUnsearchableAmount = rand(1, 5);
$productsUnsearchableAmount = random_int(1, 5);
factory(Product::class, $productsUnsearchableAmount)->states(['archive'])->create();

Product::setEventDispatcher($dispatcher);
Expand All @@ -50,7 +53,7 @@ public function test_import_entites_in_queue(): void
$dispatcher = Product::getEventDispatcher();
Product::unsetEventDispatcher();

$productsAmount = rand(1, 5);
$productsAmount = random_int(1, 5);
factory(Product::class, $productsAmount)->create();

Product::setEventDispatcher($dispatcher);
Expand Down Expand Up @@ -134,7 +137,7 @@ public function test_remove_old_index_after_switching_to_new(): void
$dispatcher = Product::getEventDispatcher();
Product::unsetEventDispatcher();

$productsAmount = rand(1, 5);
$productsAmount = random_int(1, 5);

factory(Product::class, $productsAmount)->create();

Expand All @@ -145,7 +148,7 @@ public function test_remove_old_index_after_switching_to_new(): void
$this->assertFalse($this->elasticsearch->indices()->exists(['index' => 'products_old'])->asBool(), 'Old index must be deleted');
}

public function test_progress_report()
public function test_progress_report(): void
{
$output = new BufferedOutput();
Artisan::call('scout:import', ['searchable' => [Product::class, Book::class]], $output);
Expand All @@ -165,7 +168,7 @@ public function test_progress_report()
trim($output[30]));
}

public function test_progress_report_in_queue()
public function test_progress_report_in_queue(): void
{
$this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']);

Expand All @@ -177,4 +180,92 @@ public function test_progress_report_in_queue()
$this->assertContains(trans('scout::import.start', ['searchable' => Product::class]), $output);
$this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output);
}

public function test_queue_timeout_configuration(): void
{
Bus::fake([
QueueableJob::class,
]);

$this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']);
$this->app['config']->set('elasticsearch.queue.timeout', 2);

$output = new BufferedOutput();
Artisan::call('scout:import', [], $output);

$output = array_map('trim', explode("\n", $output->fetch()));

$this->assertContains(trans('scout::import.start', ['searchable' => Product::class]), $output);
$this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output);

Bus::assertDispatched(function (QueueableJob $job) {
return $job->timeout === 2;
});
}

public function test_chained_queue_timeout_configuration(): void
{
Bus::fake([
Import::class,
]);

$this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']);
$this->app['config']->set('elasticsearch.queue.timeout', 2);

$output = new BufferedOutput();
Artisan::call('scout:import', [], $output);

$output = array_map('trim', explode("\n", $output->fetch()));

$this->assertContains(trans('scout::import.start', ['searchable' => Product::class]), $output);
$this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output);

Bus::assertDispatched(function (Import $job) {
return $job->timeout === 2;
});
}

public function test_chained_queue_timeout_configuration_with_null_value(): void
{
Bus::fake([
Import::class,
]);

$this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']);
$this->app['config']->set('elasticsearch.queue.timeout', null);

$output = new BufferedOutput();
Artisan::call('scout:import', [], $output);

$output = array_map('trim', explode("\n", $output->fetch()));

$this->assertContains(trans('scout::import.start', ['searchable' => Product::class]), $output);
$this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output);

Bus::assertDispatched(function (Import $job) {
return $job->timeout === null;
});
}

public function test_chained_queue_timeout_configuration_with_empty_string(): void
{
Bus::fake([
Import::class,
]);

$this->app['config']->set('scout.queue', ['connection' => 'sync', 'queue' => 'scout']);
$this->app['config']->set('elasticsearch.queue.timeout', '');

$output = new BufferedOutput();
Artisan::call('scout:import', [], $output);

$output = array_map('trim', explode("\n", $output->fetch()));

$this->assertContains(trans('scout::import.start', ['searchable' => Product::class]), $output);
$this->assertContains('[OK] '.trans('scout::import.done.queue', ['searchable' => Product::class]), $output);

Bus::assertDispatched(function (Import $job) {
return $job->timeout === null;
});
}
}

0 comments on commit 7b9fc92

Please sign in to comment.