Skip to content

Commit

Permalink
Add mutex guard capability
Browse files Browse the repository at this point in the history
  • Loading branch information
marioroy committed Sep 13, 2023
1 parent 4841c8f commit fdb18b2
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 43 deletions.
8 changes: 8 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@

Revision history for Perl module MCE.

1.889 Wed Sep 13 15:00:00 EST 2023

* Add Android support. Thank you, Dimitrios Kechagias.
* Improve mutex synchronize (aka enter) with guard capability.
Thank you, José Joaquín Atria.
* Fix mutex re-entrant lock on the Windows platform.
* Add mutex guard_lock method.

1.888 Wed Jun 21 17:00:00 EST 2023

* Fix typos caught by lintian. Thank you, Étienne Mollier.
Expand Down
7 changes: 7 additions & 0 deletions lib/MCE/Mutex.pm
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ implementations.
Current API available since 1.822.
=head2 $guard = $mutex->guard_lock ( void )
This method calls C<lock> and returns a guard object. When the guard object is
destroyed, it automatically calls C<unlock>.
Current API available since 1.889.
=head2 $mutex->unlock ( void )
Releases the lock. A held lock by an exiting process or thread is released
Expand Down
37 changes: 25 additions & 12 deletions lib/MCE/Mutex/Channel.pm
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@ sub CLONE {
$tid = threads->tid if $INC{'threads.pm'};
}

sub MCE::Mutex::Channel::_guard::DESTROY {
my ($pid, $obj) = @{ $_[0] };
CORE::syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0 if $obj->{ $pid };

return;
}

sub DESTROY {
my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, @_);

CORE::syswrite($obj->{_w_sock}, '0'), $obj->{$pid } = 0 if $obj->{$pid };
CORE::syswrite($obj->{_r_sock}, '0'), $obj->{$pid.'b'} = 0 if $obj->{$pid.'b'};

Expand Down Expand Up @@ -90,14 +96,20 @@ sub new {
sub lock {
my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, shift);

CORE::lock($obj->{_t_lock}), MCE::Util::_sock_ready($obj->{_r_sock})
if $is_MSWin32;
MCE::Util::_sysread($obj->{_r_sock}, my($b), 1), $obj->{ $pid } = 1
unless $obj->{ $pid };
unless ($obj->{ $pid }) {
CORE::lock($obj->{_t_lock}), MCE::Util::_sock_ready($obj->{_r_sock})
if $is_MSWin32;
MCE::Util::_sysread($obj->{_r_sock}, my($b), 1), $obj->{ $pid } = 1;
}

return;
}

sub guard_lock {
&lock(@_);
bless([ $tid ? $$ .'.'. $tid : $$, $_[0] ], MCE::Mutex::Channel::_guard::);
}

*lock_exclusive = \&lock;
*lock_shared = \&lock;

Expand All @@ -117,17 +129,16 @@ sub synchronize {
return unless ref($code) eq 'CODE';

# lock, run, unlock - inlined for performance
CORE::lock($obj->{_t_lock}), MCE::Util::_sock_ready($obj->{_r_sock})
if $is_MSWin32;
MCE::Util::_sysread($obj->{_r_sock}, $b, 1), $obj->{ $pid } = 1
unless $obj->{ $pid };

my $guard = bless([ $pid, $obj ], MCE::Mutex::Channel::_guard::);
unless ($obj->{ $pid }) {
CORE::lock($obj->{_t_lock}), MCE::Util::_sock_ready($obj->{_r_sock})
if $is_MSWin32;
MCE::Util::_sysread($obj->{_r_sock}, $b, 1), $obj->{ $pid } = 1;
}
(defined wantarray)
? @ret = wantarray ? $code->(@_) : scalar $code->(@_)
: $code->(@_);

CORE::syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0;

return wantarray ? @ret : $ret[-1];
}

Expand Down Expand Up @@ -194,6 +205,8 @@ The API is described in L<MCE::Mutex>.
=item lock_shared
=item guard_lock
=item unlock
=item synchronize
Expand Down
38 changes: 27 additions & 11 deletions lib/MCE/Mutex/Channel2.pm
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ sub CLONE {
$tid = threads->tid() if $INC{'threads.pm'};
}

sub MCE::Mutex::Channel2::_guard::DESTROY {
my ($pid, $obj) = @{ $_[0] };
CORE::syswrite($obj->{_r_sock}, '0'), $obj->{$pid.'b'} = 0 if $obj->{$pid.'b'};

return;
}

###############################################################################
## ----------------------------------------------------------------------------
## Public methods.
Expand Down Expand Up @@ -56,14 +63,20 @@ sub new {
sub lock2 {
my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, shift);

CORE::lock($obj->{_t_lock2}), MCE::Util::_sock_ready($obj->{_w_sock})
if $is_MSWin32;
MCE::Util::_sysread($obj->{_w_sock}, my($b), 1), $obj->{ $pid.'b' } = 1
unless $obj->{ $pid.'b' };
unless ($obj->{ $pid.'b' }) {
CORE::lock($obj->{_t_lock2}), MCE::Util::_sock_ready($obj->{_w_sock})
if $is_MSWin32;
MCE::Util::_sysread($obj->{_w_sock}, my($b), 1), $obj->{ $pid.'b' } = 1;
}

return;
}

sub guard_lock2 {
&lock2(@_);
bless([ $tid ? $$ .'.'. $tid : $$, $_[0] ], MCE::Mutex::Channel2::_guard::);
}

*lock_exclusive2 = \&lock2;
*lock_shared2 = \&lock2;

Expand All @@ -83,17 +96,16 @@ sub synchronize2 {
return unless ref($code) eq 'CODE';

# lock, run, unlock - inlined for performance
CORE::lock($obj->{_t_lock2}), MCE::Util::_sock_ready($obj->{_w_sock})
if $is_MSWin32;
MCE::Util::_sysread($obj->{_w_sock}, $b, 1), $obj->{ $pid.'b' } = 1
unless $obj->{ $pid.'b' };

my $guard = bless([ $pid, $obj ], MCE::Mutex::Channel2::_guard::);
unless ($obj->{ $pid.'b' }) {
CORE::lock($obj->{_t_lock2}), MCE::Util::_sock_ready($obj->{_w_sock})
if $is_MSWin32;
MCE::Util::_sysread($obj->{_w_sock}, $b, 1), $obj->{ $pid.'b' } = 1;
}
(defined wantarray)
? @ret = wantarray ? $code->(@_) : scalar $code->(@_)
: $code->(@_);

CORE::syswrite($obj->{_r_sock}, '0'), $obj->{ $pid.'b' } = 0;

return wantarray ? @ret : $ret[-1];
}

Expand Down Expand Up @@ -171,6 +183,8 @@ The API is described in L<MCE::Mutex>.
=item lock_shared
=item guard_lock
=item unlock
=item synchronize
Expand All @@ -191,6 +205,8 @@ The API is described in L<MCE::Mutex>.
=item lock_shared2
=item guard_lock2
=item unlock2
=item synchronize2
Expand Down
24 changes: 18 additions & 6 deletions lib/MCE/Mutex/Flock.pm
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ sub CLONE {
$tid = threads->tid() if $INC{'threads.pm'};
}

sub MCE::Mutex::Flock::_guard::DESTROY {
my ($pid, $obj) = @{ $_[0] };
CORE::flock ($obj->{_fh}, LOCK_UN), $obj->{ $pid } = 0 if $obj->{ $pid };

return;
}

sub DESTROY {
my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, @_);

$obj->unlock(), close(delete $obj->{_fh}) if $obj->{ $pid };
unlink $obj->{path} if ($obj->{_init} && $obj->{_init} eq $pid);

Expand Down Expand Up @@ -123,6 +129,11 @@ sub lock {
return;
}

sub guard_lock {
&lock(@_);
bless([ $tid ? $$ .'.'. $tid : $$, $_[0] ], MCE::Mutex::Flock::_guard::);
}

*lock_exclusive = \&lock;

sub lock_shared {
Expand Down Expand Up @@ -153,15 +164,14 @@ sub synchronize {
$obj->_open() unless exists $obj->{ $pid };

# lock, run, unlock - inlined for performance
CORE::flock ($obj->{_fh}, LOCK_EX), $obj->{ $pid } = 1
unless $obj->{ $pid };

my $guard = bless([ $pid, $obj ], MCE::Mutex::Flock::_guard::);
unless ($obj->{ $pid }) {
CORE::flock ($obj->{_fh}, LOCK_EX), $obj->{ $pid } = 1;
}
(defined wantarray)
? @ret = wantarray ? $code->(@_) : scalar $code->(@_)
: $code->(@_);

CORE::flock ($obj->{_fh}, LOCK_UN), $obj->{ $pid } = 0;

return wantarray ? @ret : $ret[-1];
}

Expand Down Expand Up @@ -219,6 +229,8 @@ The API is described in L<MCE::Mutex>.
=item lock_shared
=item guard_lock
=item unlock
=item synchronize
Expand Down
36 changes: 28 additions & 8 deletions xt/channel2_lock.t
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,50 @@ my $mutex = MCE::Mutex->new( impl => 'Channel2' );

is($mutex->impl(), 'Channel2', 'implementation name');

sub task {
sub task1a {
$mutex->lock_exclusive;
sleep(1) for 1..2;
$mutex->unlock;
}
sub task2 {
$mutex->lock_exclusive2;
sub task1b {
my $guard = $mutex->guard_lock;
sleep(1) for 1..2;
$mutex->unlock2;
}

sub spawn {
sub spawn1 {
my ($i) = @_;
my $pid = fork;
task(), exit() if $pid == 0;
if ($pid == 0) {
task1a() if ($i % 2 != 0);
task1b() if ($i % 2 == 0);
exit();
}
return $pid;
}

sub task2a {
$mutex->lock_exclusive2;
sleep(1) for 1..2;
$mutex->unlock2;
}
sub task2b {
my $guard = $mutex->guard_lock2;
sleep(1) for 1..2;
}

sub spawn2 {
my ($i) = @_;
my $pid = fork;
task2(), exit() if $pid == 0;
if ($pid == 0) {
task2a() if ($i % 2 != 0);
task2b() if ($i % 2 == 0);
exit();
}
return $pid;
}

my $start = time;
my @pids = map { spawn(), spawn2() } 1..3;
my @pids = map { spawn1($_), spawn2($_) } 1..3;

waitpid($_, 0) for @pids;

Expand Down
16 changes: 13 additions & 3 deletions xt/channel_lock.t
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,29 @@ my $mutex = MCE::Mutex->new( impl => 'Channel' );

is($mutex->impl(), 'Channel', 'implementation name');

sub task {
sub task1 {
$mutex->lock_exclusive;
sleep(1) for 1..2;
$mutex->unlock;
}
sub task2 {
my $guard = $mutex->guard_lock;
sleep(1) for 1..2;
}

sub spawn {
my ($i) = @_;
my $pid = fork;
task(), exit() if $pid == 0;
if ($pid == 0) {
task1() if ($i % 2 != 0);
task2() if ($i % 2 == 0);
exit();
}
return $pid;
}

my $start = time;
my @pids = map { spawn() } 1..3;
my @pids = map { spawn($_) } 1..3;

waitpid($_, 0) for @pids;

Expand Down
16 changes: 13 additions & 3 deletions xt/flock_lock.t
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,29 @@ my $mutex = MCE::Mutex->new( impl => 'Flock' );

is($mutex->impl(), 'Flock', 'implementation name');

sub task {
sub task1 {
$mutex->lock_exclusive;
sleep 1;
$mutex->unlock;
}
sub task2 {
my $guard = $mutex->guard_lock;
sleep 1;
}

sub spawn {
my ($i) = @_;
my $pid = fork;
task(), exit() if $pid == 0;
if ($pid == 0) {
task1() if ($i % 2 != 0);
task2() if ($i % 2 == 0);
exit();
}
return $pid;
}

my $start = time;
my @pids = map { spawn() } 1..4;
my @pids = map { spawn($_) } 1..4;

waitpid($_, 0) for @pids;

Expand Down

0 comments on commit fdb18b2

Please sign in to comment.