From 569da28ff2e73d85fa359eb71b09b27612ef8a5b Mon Sep 17 00:00:00 2001 From: Sebastian Riedel Date: Wed, 6 Jun 2018 22:05:13 +0200 Subject: [PATCH 1/5] add a minion_dev_server helper that eliminates the need for separate worker processes during development --- lib/Mojolicious/Plugin/Minion.pm | 28 +++++++++++++++ t/pg_dev_server.t | 58 ++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 t/pg_dev_server.t diff --git a/lib/Mojolicious/Plugin/Minion.pm b/lib/Mojolicious/Plugin/Minion.pm index dc0f0ed3..5c6088cf 100644 --- a/lib/Mojolicious/Plugin/Minion.pm +++ b/lib/Mojolicious/Plugin/Minion.pm @@ -2,12 +2,28 @@ package Mojolicious::Plugin::Minion; use Mojo::Base 'Mojolicious::Plugin'; use Minion; +use Mojo::IOLoop; +use Scalar::Util 'weaken'; sub register { my ($self, $app, $conf) = @_; push @{$app->commands->namespaces}, 'Minion::Command'; my $minion = Minion->new(%$conf)->app($app); $app->helper(minion => sub {$minion}); + $app->helper(minion_dev_server => \&_dev_server); +} + +sub _dev_server { + my ($c, @args) = @_; + + $c->app->hook( + before_server_start => sub { + my ($server, $app) = @_; + return unless $server->isa('Mojo::Server::Daemon'); + $app->minion->missing_after(0)->repair; + Mojo::IOLoop->recurring(1 => sub { $app->minion->perform_jobs(@args) }); + } + ) if $c->app->mode eq 'development'; } 1; @@ -30,6 +46,10 @@ Mojolicious::Plugin::Minion - Minion job queue plugin helper pg => sub { state $pg = Mojo::Pg->new('postgresql://postgres@/test') }; plugin Minion => {Pg => app->pg}; + # Perform jobs in the web server during development + plugin Minion => {Pg => 'postgresql://postgres@/test'}; + app->minion_dev_server; + # Add tasks to your application app->minion->add_task(slow_log => sub { my ($job, $msg) = @_; @@ -67,6 +87,14 @@ Get L object for application. # Perform jobs for testing $app->minion->perform_jobs; +=head2 minion_dev_server + + $app->minion_dev_server; + +In C mode when started with a development web server perform all +jobs from the web server without starting a separate worker process, takes the +same arguments as L. + =head1 METHODS L inherits all methods from diff --git a/t/pg_dev_server.t b/t/pg_dev_server.t new file mode 100644 index 00000000..974d2940 --- /dev/null +++ b/t/pg_dev_server.t @@ -0,0 +1,58 @@ +use Mojo::Base -strict; + +BEGIN { $ENV{MOJO_REACTOR} = 'Mojo::Reactor::Poll' } + +use Test::More; + +plan skip_all => 'set TEST_ONLINE to enable this test' unless $ENV{TEST_ONLINE}; + +use Mojo::IOLoop; +use Mojolicious::Lite; +use Test::Mojo; + +# Isolate tests +require Mojo::Pg; +my $pg = Mojo::Pg->new($ENV{TEST_ONLINE}); +$pg->db->query('drop schema if exists minion_dev_server_test cascade'); +$pg->db->query('create schema minion_dev_server_test'); +plugin Minion => {Pg => $pg->search_path(['minion_dev_server_test'])}; + +# Development server +app->minion_dev_server({queues => ['test']}); + +app->minion->add_task( + add => sub { + my ($job, $first, $second) = @_; + Mojo::IOLoop->next_tick(sub { + $job->finish($first + $second); + Mojo::IOLoop->stop; + }); + Mojo::IOLoop->start; + } +); + +get '/add' => sub { + my $c = shift; + my $id = $c->minion->enqueue( + add => [$c->param('first'), $c->param('second')] => {queue => 'test'}); + $c->render(text => $id); +}; + +get '/result' => sub { + my $c = shift; + $c->render(text => $c->minion->job($c->param('id'))->info->{result}); +}; + +my $t = Test::Mojo->new; + +# Perform jobs automatically +$t->get_ok('/add' => form => {first => 1, second => 2})->status_is(200); +Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop }); +Mojo::IOLoop->start; +$t->get_ok('/result' => form => {id => $t->tx->res->text})->status_is(200) + ->content_is('3'); + +# Clean up once we are done +$pg->db->query('drop schema minion_dev_server_test cascade'); + +done_testing(); From a34826f4d8cc50c4281d5793dd661934231fe05f Mon Sep 17 00:00:00 2001 From: Roy Storey Date: Wed, 7 Aug 2019 19:36:31 +1200 Subject: [PATCH 2/5] use subprocess to run workers --- lib/Mojolicious/Plugin/Minion.pm | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/lib/Mojolicious/Plugin/Minion.pm b/lib/Mojolicious/Plugin/Minion.pm index 5c6088cf..197272c4 100644 --- a/lib/Mojolicious/Plugin/Minion.pm +++ b/lib/Mojolicious/Plugin/Minion.pm @@ -3,7 +3,6 @@ use Mojo::Base 'Mojolicious::Plugin'; use Minion; use Mojo::IOLoop; -use Scalar::Util 'weaken'; sub register { my ($self, $app, $conf) = @_; @@ -21,7 +20,27 @@ sub _dev_server { my ($server, $app) = @_; return unless $server->isa('Mojo::Server::Daemon'); $app->minion->missing_after(0)->repair; - Mojo::IOLoop->recurring(1 => sub { $app->minion->perform_jobs(@args) }); + + # without server event finish, use worker pid going away to finish + my $morbo_worker_pid = $$; + Mojo::IOLoop->subprocess( + sub { + my $subprocess = shift; + # rename process + $0 = 'dev-minion-worker'; + $subprocess->ioloop->recurring( + 1 => sub { $app->minion->perform_jobs(@args); }); + $subprocess->ioloop->recurring( + 1 => sub { shift->stop unless kill 0, $morbo_worker_pid }); + $subprocess->ioloop->start unless $subprocess->ioloop->is_running; + $app->log->debug("$0 $$ finished."); + return 0; + }, + sub { + my ($subprocess, $err, @results) = @_; + $app->log->debug("Subprocess error: $err") and return if $err; + } + ); } ) if $c->app->mode eq 'development'; } From 6967e53ac6d37d2db036c78589977ca9b855d162 Mon Sep 17 00:00:00 2001 From: Roy Storey Date: Mon, 12 Aug 2019 19:57:56 +1200 Subject: [PATCH 3/5] better option, continually running worker, more like the command --- lib/Mojolicious/Plugin/Minion.pm | 65 +++++-- t/pg_dev_server.t | 315 +++++++++++++++++++++++++++++-- 2 files changed, 355 insertions(+), 25 deletions(-) diff --git a/lib/Mojolicious/Plugin/Minion.pm b/lib/Mojolicious/Plugin/Minion.pm index 197272c4..23997b20 100644 --- a/lib/Mojolicious/Plugin/Minion.pm +++ b/lib/Mojolicious/Plugin/Minion.pm @@ -21,30 +21,67 @@ sub _dev_server { return unless $server->isa('Mojo::Server::Daemon'); $app->minion->missing_after(0)->repair; - # without server event finish, use worker pid going away to finish - my $morbo_worker_pid = $$; + # without server event finish, use server pid going away to set finished + my $server_pid = $$; Mojo::IOLoop->subprocess( sub { - my $subprocess = shift; - # rename process - $0 = 'dev-minion-worker'; - $subprocess->ioloop->recurring( - 1 => sub { $app->minion->perform_jobs(@args); }); - $subprocess->ioloop->recurring( - 1 => sub { shift->stop unless kill 0, $morbo_worker_pid }); - $subprocess->ioloop->start unless $subprocess->ioloop->is_running; - $app->log->debug("$0 $$ finished."); - return 0; + _run_worker($app->minion->worker, $app->log, $server_pid, @args); }, sub { - my ($subprocess, $err, @results) = @_; - $app->log->debug("Subprocess error: $err") and return if $err; + $app->log->warn("Subprocess error: $_[1]") and return if $_[1]; } ); } ) if $c->app->mode eq 'development'; } +sub _job_spawned { + my ($job, $pid) = @_; + my ($id, $task) = ($job->id, $job->task); + $job->app->log->debug( + qq{Process $pid is performing job "$id" with task "$task"}); +} + +sub _run_worker { + my ($worker, $log, $parent_pid, $args) = (shift, shift, shift, shift || {}); + + # remarkably similar to Minion::Worker->run, but some important differences + my $status = $worker->status($args)->status; + $status->{command_interval} //= 10; + $status->{dequeue_timeout} //= 5; + $status->{heartbeat_interval} //= 300; + $status->{jobs} //= 4; + $status->{queues} ||= ['default']; + $status->{performed} //= 0; + $status->{repair_interval} //= 21600; + $status->{repair_interval} -= int rand $status->{repair_interval} / 2; + $worker->on(dequeue => sub { pop->once(spawn => \&_job_spawned) }); + + local $SIG{CHLD} = sub { }; + my $commands = $worker->commands; + my $kill = sub { + return unless grep { ($_[1] // '') eq $_ } qw(INT KILL USR1 USR2); + $worker->{jobs}{$_[2]}->kill($_[1]) if $worker->{jobs}{$_[2] // ''}; + }; + local $commands->{jobs} + = sub { $status->{jobs} = $_[1] if ($_[1] // '') =~ /^\d+$/ }; + local $commands->{kill} = $kill; + local $commands->{stop} = sub { $kill->('KILL', $_[1]) }; + eval { + $log->info("Worker $$ started"); + + # $self->{last_repair} ||= 0 in _work() deletes the worker otherwise + $worker->minion->missing_after(180); + $worker->_work + until ($worker->{finished} = !kill 0, $parent_pid) + && !keys %{$worker->{jobs}}; + }; + $log->info("Worker $$ stopped"); + my $err = $@; + $worker->unregister; + $log->fatal($err) if $err; +} + 1; =encoding utf8 diff --git a/t/pg_dev_server.t b/t/pg_dev_server.t index 974d2940..5589b46b 100644 --- a/t/pg_dev_server.t +++ b/t/pg_dev_server.t @@ -9,28 +9,147 @@ plan skip_all => 'set TEST_ONLINE to enable this test' unless $ENV{TEST_ONLINE}; use Mojo::IOLoop; use Mojolicious::Lite; use Test::Mojo; +use Mojo::Util qw{monkey_patch steady_time}; +use POSIX 'WNOHANG'; # Isolate tests require Mojo::Pg; my $pg = Mojo::Pg->new($ENV{TEST_ONLINE}); $pg->db->query('drop schema if exists minion_dev_server_test cascade'); $pg->db->query('create schema minion_dev_server_test'); + +# make Test::Mojo subprocess aware and record the correct number of tests +Test::Mojo->attr(['subprocess']); + +# majority of the tests run in a subprocess so proxy back to parent +monkey_patch 'Test::Mojo', '_test' => sub { + my ($self, $name, @args) = @_; + local $Test::Builder::Level = $Test::Builder::Level + 2; + + my ($stdout, $stderr) = ('', ''); + local (*STDOUT, *STDERR); + open STDOUT, '>', \$stdout; + open STDERR, '>', \$stderr; + Test::More->builder->output(\*STDOUT); + Test::More->builder->failure_output(\*STDERR); + my $sucess = $self->success(!!Test::More->can($name)->(@args)); + Test::More->builder->reset_outputs; + + my $subprocess = $self->subprocess or return $sucess; + $subprocess->progress({id => 0, stderr => $stderr}) if $stderr; + $subprocess->progress({id => 1, stdout => $stdout}); + return $sucess; + }, + + # often need to wait for a job to finish + 'wait_for_job' => sub { + my ($self, $job_id, $interval) = (shift, shift, shift || 5); + my ($limit, $jobs) = (steady_time + $interval); + do { + $self->subprocess->ioloop->timer(1 => sub { shift->stop }); + $self->subprocess->ioloop->start; + $self->app->log->info("waiting"); + $jobs = $self->app->minion->backend->list_jobs(0, 1, {ids => [$job_id]}); + } until ($jobs->{jobs}[0]{state} eq 'finished' || $limit < steady_time); + }; + +my $minion_worker_pids = []; + +# Mojo::IOLoop::Subprocess progress callback +sub progress_reporter { + my ($subprocess, $data) = @_; + + if (my $cmd = $data->{command}) { + if ($cmd eq 'worker_pid') { + my $pid = $data->{pid}; + return diag "invalid PID" if !defined($pid) || $pid < 0; + push @$minion_worker_pids, $data->{pid}; + } + else { + die "unknown command '$cmd'"; + } + return; + } + + my ($id, $stdout, $stderr) = @$data{qw{id stdout stderr}}; + record_test() if $id; + print STDOUT "$stdout" if $stdout; + print STDERR "$stderr" if $stderr; +} + +# updates Test::More's idea of how many tests +sub record_test { + state $test_number //= 0; + Test::More->builder->current_test($test_number++); + return shift(); +} + +# run a server in subprocess for reasons +sub run_server_ok { + my ($run, $finished, $until) = (shift, 0); + $minion_worker_pids = []; + + local $SIG{CHLD} = 'DEFAULT'; + Mojo::IOLoop->subprocess( + $run, + sub { + my ($subprocess, $err, $results) = @_; + + # end testing if this is the case + die "not ok $err" if ($err); + is_deeply $results, ['finished', $subprocess->pid], + record_test('correct result'); + $finished = 1; + } + )->on( + spawn => sub { + $_[0]->on(progress => \&progress_reporter); + my $pid = $_[0]->pid; + + # still local - so that server goes away triggering minion worker's + # finished state with kill 0, $parent + $SIG{CHLD} = sub { waitpid $pid, WNOHANG; }; + } + ); + + $until = Mojo::IOLoop->recurring( + 1 => sub { $_[0]->remove($until) and $_[0]->stop if $finished == 1; }); + Mojo::IOLoop->start unless Mojo::IOLoop->is_running; + is $finished, 1, record_test('run_server_ok'); +} + plugin Minion => {Pg => $pg->search_path(['minion_dev_server_test'])}; +plugin 'Minion::Admin' => {}; + +app->mode('testing'); + # Development server app->minion_dev_server({queues => ['test']}); app->minion->add_task( add => sub { my ($job, $first, $second) = @_; - Mojo::IOLoop->next_tick(sub { - $job->finish($first + $second); - Mojo::IOLoop->stop; - }); + Mojo::IOLoop->next_tick( + sub { + $job->finish($first + $second); + Mojo::IOLoop->stop; + } + ); Mojo::IOLoop->start; } ); +app->minion->add_task( + block => sub { + Mojo::IOLoop->timer(10 => sub { Mojo::IOLoop->stop }); + Mojo::IOLoop->start; + shift->finish("$$"); + } +); + +get '/' => sub { shift->render(text => 'index') }; + get '/add' => sub { my $c = shift; my $id = $c->minion->enqueue( @@ -38,19 +157,193 @@ get '/add' => sub { $c->render(text => $id); }; +get '/add-many' => sub { + my $c = shift; + my $ids = []; + push @$ids, + $c->minion->enqueue( + add => [$c->param('first'), $c->param('second')] => {queue => 'test'}) + for 0 .. 99; + $c->render(json => $ids); +}; + +get '/block' => sub { + my $c = shift; + my $id = $c->minion->enqueue(block => [] => {queue => 'test'}); + $c->render(text => $id); +}; + get '/result' => sub { my $c = shift; $c->render(text => $c->minion->job($c->param('id'))->info->{result}); }; -my $t = Test::Mojo->new; +get '/status' => sub { + my $c = shift; + $c->render(text => $c->minion->job($c->param('id'))->info->{state}); +}; + +get '/workers' => sub { + my $c = shift; + $c->render(json => $c->minion->backend->list_workers(0, 100, {})); +}; + + +run_server_ok sub { + my ($subprocess) = @_; + my $t = Test::Mojo->new; + $t->subprocess($subprocess)->get_ok('/')->status_is(200); + $t->get_ok('/')->status_is(200) for 0 .. 1; + + return ['finished', $$]; +}; + +run_server_ok sub { + my ($subprocess) = @_; + my $t = Test::Mojo->new; + $t->app->mode('testing'); + $t->app->minion_dev_server({queues => ['test']}); + + $t->subprocess($subprocess) + ->get_ok('/add' => form => {first => 1, second => 2}) + ->status_is(200, 'job enqueued'); + $subprocess->ioloop->timer(2 => sub { shift->stop }); + $subprocess->ioloop->start; + $t->get_ok('/result' => form => {id => $t->tx->res->text}) + ->status_is(404, 'this job has not dequeued or run'); + + return ['finished', $$]; +}; + +run_server_ok sub { + my ($subprocess) = @_; + my $t = Test::Mojo->new; + $0 = 'test-server'; + $t->app->mode('development'); + $t->app->minion_dev_server( + {queues => ['test'], jobs => 1, dequeue_timeout => 0,}); + $t->subprocess($subprocess); + + # Perform jobs automatically + $t->get_ok('/add' => form => {first => 1, second => 2})->status_is(200); + + # wait for job to finish + $t->wait_for_job($t->tx->res->text, 2); + $t->get_ok('/result' => form => {id => $t->tx->res->text})->status_is(200) + ->content_is('3', 'simple 1 + 2'); + + $t->get_ok('/block')->status_is(200) + ->content_like(qr/^[0-9]+$/, 'numeric id'); + my $job_id = $t->tx->res->text; + + # test worker has started - one of two + $t->get_ok('/workers')->status_is(200) + ->json_is('/total', 2, 'have 2 workers (blocking + non blocking)') + ->json_like('/workers/0/pid', qr/^[0-9]+$/, 'with a process id'); + my $workers = $t->tx->res->json('/workers'); + $subprocess->progress({command => 'worker_pid', pid => $_->{pid}}) + for @$workers; + + # get status while job is running + $t->get_ok('/status' => form => {id => $job_id})->status_is(200) + ->content_is('active', 'request not blocked') + for 0 .. 1; + + # wait for job to finish and check + my $before = steady_time; + $t->wait_for_job($job_id, 30); + $t->_test('cmp_ok', (steady_time - $before), '>', 5, "waited this long"); + + $t->get_ok('/result' => form => {id => $job_id})->status_is(200) + ->content_like(qr/^[0-9]+$/, 'result is process id or job process'); + + $t->get_ok('/add' => form => {first => 40, second => 2})->status_is(200); + $t->wait_for_job($t->tx->res->text); + $t->get_ok('/result' => form => {id => $t->tx->res->text})->status_is(200) + ->content_is('42', 'simple 40 + 2'); + + return ['finished', $$]; +}; + +ok @$minion_worker_pids > 0, record_test("saved a worker"); +for my $pid (@$minion_worker_pids) { + is kill(0, $pid), 0, record_test("worker (pid=$pid) finished with server"); +} + +run_server_ok sub { + my ($subprocess) = @_; + my $t = Test::Mojo->new; + $0 = 'fast-finishing'; + $t->app->mode('development'); + $t->app->minion_dev_server({queues => ['test'], jobs => 2}); + $t->subprocess($subprocess); + my $input = [map { [int(rand(200)), 1] } 0 .. 7]; + + for my $i (@$input) { + $t->get_ok('/add-many' => form => {first => $i->[0], second => $i->[1]}) + ->status_is(200); + } + + # test worker has started - one of two + $t->get_ok('/workers')->status_is(200) + ->json_is('/total', 2, 'have 2 workers (blocking + non blocking)') + ->json_like('/workers/0/pid', qr/^[0-9]+$/, 'with a process id'); + my $workers = $t->tx->res->json('/workers'); + $subprocess->progress({command => 'worker_pid', pid => $_->{pid}}) + for @$workers; + + return ['finished', $$]; +}; + +my $result = app->minion->backend->list_jobs(0, 1, {states => ['inactive']}); +my $jobs = $result->{jobs}; +cmp_ok $result->{total}, '>=', 1, + record_test("left inactive jobs - fast finished"); + +cmp_ok @$minion_worker_pids, '>', 0, record_test("saved a worker"); +for my $pid (@$minion_worker_pids) { + is kill(0, $pid), 0, record_test("worker (pid=$pid) finished with server"); +} + +run_server_ok sub { + my ($subprocess) = @_; + my $t = Test::Mojo->new; + $t->app->mode('development'); + $t->app->minion_dev_server( + { + queues => ['test'], + jobs => 100, + heartbeat_interval => 1, + dequeue_timeout => 0, + } + ); + + # run the inactive jobs and add a few more + $t->subprocess($subprocess) + ->get_ok('/add' => form => {first => 1, second => 2}) + ->status_is(200, 'job enqueued'); + my $job_id = $t->tx->res->text; + $t->wait_for_job($job_id, 60); + $t->get_ok('/result' => form => {id => $job_id}) + ->status_is(200, 'this job ran')->content_is(3, 'add'); + + # wait for worker heartbeat update status + $subprocess->ioloop->timer(5 => sub { shift->stop }); + $subprocess->ioloop->start; + my $results = $t->app->minion->backend->list_workers(); + $t->_test('is', $results->{total}, 2, 'two workers'); + for my $w (@{$results->{workers}}) { + $t->_test('is', $w->{status}{dequeue_timeout}, 0, 'configuration'); + $t->_test('is', $w->{status}{heartbeat_interval}, 1, 'configuration'); + $t->_test('is', $w->{status}{jobs}, 100, 'configuration'); + $t->_test('cmp_ok', $w->{status}{performed}, '>', 1, 'heartbeat'); + $t->_test('is_deeply', $w->{status}{queues}, ["test"], 'configuration'); + } + + return ['finished', $$]; +}; -# Perform jobs automatically -$t->get_ok('/add' => form => {first => 1, second => 2})->status_is(200); -Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop }); -Mojo::IOLoop->start; -$t->get_ok('/result' => form => {id => $t->tx->res->text})->status_is(200) - ->content_is('3'); +is $SIG{CHLD}, undef, record_test('local sig chld'); # Clean up once we are done $pg->db->query('drop schema minion_dev_server_test cascade'); From c74e5f55c3b8e751a5e1ae77490380e1cea20a89 Mon Sep 17 00:00:00 2001 From: Roy Storey Date: Tue, 13 Aug 2019 00:05:19 +1200 Subject: [PATCH 4/5] minor pod change --- lib/Mojolicious/Plugin/Minion.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Mojolicious/Plugin/Minion.pm b/lib/Mojolicious/Plugin/Minion.pm index 23997b20..85749c7d 100644 --- a/lib/Mojolicious/Plugin/Minion.pm +++ b/lib/Mojolicious/Plugin/Minion.pm @@ -149,7 +149,7 @@ Get L object for application. In C mode when started with a development web server perform all jobs from the web server without starting a separate worker process, takes the -same arguments as L. +same arguments as L. =head1 METHODS From 1aec62fa3cfdce034ee7c27373dcc7585a442b5e Mon Sep 17 00:00:00 2001 From: Roy Storey Date: Tue, 13 Aug 2019 00:29:38 +1200 Subject: [PATCH 5/5] cleaner code --- lib/Mojolicious/Plugin/Minion.pm | 49 ++++++-------------------------- 1 file changed, 8 insertions(+), 41 deletions(-) diff --git a/lib/Mojolicious/Plugin/Minion.pm b/lib/Mojolicious/Plugin/Minion.pm index 85749c7d..03f9aa09 100644 --- a/lib/Mojolicious/Plugin/Minion.pm +++ b/lib/Mojolicious/Plugin/Minion.pm @@ -25,7 +25,14 @@ sub _dev_server { my $server_pid = $$; Mojo::IOLoop->subprocess( sub { - _run_worker($app->minion->worker, $app->log, $server_pid, @args); + $app->minion->on(worker => sub { + my ($minion, $worker) = @_; + $minion->missing_after(180); + $worker->status(@args); + $worker->on(dequeue => sub { pop->once(spawn => \&_job_spawned) }); + $worker->on(wait => sub { shift->{finished} = !kill 0, $server_pid}); + }); + $app->minion->worker->run; }, sub { $app->log->warn("Subprocess error: $_[1]") and return if $_[1]; @@ -42,46 +49,6 @@ sub _job_spawned { qq{Process $pid is performing job "$id" with task "$task"}); } -sub _run_worker { - my ($worker, $log, $parent_pid, $args) = (shift, shift, shift, shift || {}); - - # remarkably similar to Minion::Worker->run, but some important differences - my $status = $worker->status($args)->status; - $status->{command_interval} //= 10; - $status->{dequeue_timeout} //= 5; - $status->{heartbeat_interval} //= 300; - $status->{jobs} //= 4; - $status->{queues} ||= ['default']; - $status->{performed} //= 0; - $status->{repair_interval} //= 21600; - $status->{repair_interval} -= int rand $status->{repair_interval} / 2; - $worker->on(dequeue => sub { pop->once(spawn => \&_job_spawned) }); - - local $SIG{CHLD} = sub { }; - my $commands = $worker->commands; - my $kill = sub { - return unless grep { ($_[1] // '') eq $_ } qw(INT KILL USR1 USR2); - $worker->{jobs}{$_[2]}->kill($_[1]) if $worker->{jobs}{$_[2] // ''}; - }; - local $commands->{jobs} - = sub { $status->{jobs} = $_[1] if ($_[1] // '') =~ /^\d+$/ }; - local $commands->{kill} = $kill; - local $commands->{stop} = sub { $kill->('KILL', $_[1]) }; - eval { - $log->info("Worker $$ started"); - - # $self->{last_repair} ||= 0 in _work() deletes the worker otherwise - $worker->minion->missing_after(180); - $worker->_work - until ($worker->{finished} = !kill 0, $parent_pid) - && !keys %{$worker->{jobs}}; - }; - $log->info("Worker $$ stopped"); - my $err = $@; - $worker->unregister; - $log->fatal($err) if $err; -} - 1; =encoding utf8