Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow queueing of callbacks in destructors #99

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions src/EventLoop/Internal/AbstractDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ abstract class AbstractDriver implements Driver

private readonly \Closure $interruptCallback;
private readonly \Closure $queueCallback;
/** @var \Closure(): ((\Closure(): mixed)|bool) */
private readonly \Closure $runCallback;

private readonly \stdClass $internalSuspensionMarker;
Expand Down Expand Up @@ -87,13 +88,15 @@ public function __construct()
/** @psalm-suppress InvalidArgument */
$this->interruptCallback = $this->setInterrupt(...);
$this->queueCallback = $this->queue(...);
$this->runCallback = function () {
if ($this->fiber->isTerminated()) {
$this->createLoopFiber();
}
$this->runCallback =
/** @return (\Closure(): mixed)|bool */
function (): \Closure|bool {
if ($this->fiber->isTerminated()) {
$this->createLoopFiber();
}

return $this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start();
};
return ($this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start()) ?? $this->fiber->getReturn();
};
}

public function run(): void
Expand Down Expand Up @@ -490,9 +493,11 @@ private function tick(bool $previousIdle): void
$this->dispatch($blocking);
}

private function invokeCallbacks(): void
private function invokeCallbacks(): bool
{
$didWork = false;
while (!$this->microtaskQueue->isEmpty() || !$this->callbackQueue->isEmpty()) {
$didWork = true;
/** @noinspection PhpUnhandledExceptionInspection */
$yielded = $this->callbackFiber->isStarted()
? $this->callbackFiber->resume()
Expand All @@ -506,6 +511,7 @@ private function invokeCallbacks(): void
$this->invokeInterrupt();
}
}
return $didWork;
}

/**
Expand All @@ -531,28 +537,31 @@ private function invokeInterrupt(): void

private function createLoopFiber(): void
{
$this->fiber = new \Fiber(function (): void {
$this->fiber = new \Fiber(function (): bool {
$this->stopped = false;

// Invoke microtasks if we have some
$this->invokeCallbacks();
$didWork = $this->invokeCallbacks();

/** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */
while (!$this->stopped) {
if ($this->interrupt) {
$didWork = true;
$this->invokeInterrupt();
}

if ($this->isEmpty()) {
return;
return $didWork;
}

$previousIdle = $this->idle;
$this->idle = true;

$this->tick($previousIdle);
$this->invokeCallbacks();
$didWork = $this->invokeCallbacks() || $didWork;
}

return $didWork;
});
}

Expand Down
15 changes: 14 additions & 1 deletion src/EventLoop/Internal/DriverSuspension.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ final class DriverSuspension implements Suspension
private bool $deadMain = false;

/**
* @param \Closure(): ((\Closure(): mixed)|bool) $run
* @param \WeakMap<object, \WeakReference<DriverSuspension>> $suspensions
*/
public function __construct(
Expand Down Expand Up @@ -116,6 +117,15 @@ public function suspend(): mixed
// Awaiting from {main}.
$result = ($this->run)();

/** @var bool $this->pending */
if ($this->pending && \is_bool($result)) {
do {
while (\gc_collect_cycles());
$result = ($this->run)();
/** @var bool $this->pending */
} while ($this->pending && $result === true);
}

/** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */
if ($this->pending) {
// This is now a dead {main} suspension.
Expand All @@ -124,7 +134,9 @@ public function suspend(): mixed
// Unset suspension for {main} using queue closure.
unset($this->suspensions[$this->queue]);

$result && $result(); // Unwrap any uncaught exceptions from the event loop
if ($result instanceof \Closure) {
$result();
} // Unwrap any uncaught exceptions from the event loop

\gc_collect_cycles(); // Collect any circular references before dumping pending suspensions.

Expand All @@ -145,6 +157,7 @@ public function suspend(): mixed
throw new \Error('Event loop terminated without resuming the current suspension (the cause is either a fiber deadlock, or an incorrectly unreferenced/canceled watcher):' . $info);
}

\assert($result instanceof \Closure);
return $result();
}

Expand Down
124 changes: 124 additions & 0 deletions test/EventLoopTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,130 @@

class EventLoopTest extends TestCase
{
public function testSuspensionResumptionWithQueueInGarbageCollection(): void
{
$suspension = EventLoop::getSuspension();

$class = new class ($suspension) {
public function __construct(public Suspension $suspension)
{
}
public function __destruct()
{
$this->suspension->resume(true);
}
};
$cycle = [$class, &$cycle];
unset($class, $cycle);

$ended = $suspension->suspend();

$this->assertTrue($ended);
}

public function testEventLoopResumptionWithQueueInGarbageCollection(): void
{
$suspension = EventLoop::getSuspension();

$class = new class ($suspension) {
public function __construct(public Suspension $suspension)
{
}
public function __destruct()
{
EventLoop::queue($this->suspension->resume(...), true);
}
};
$cycle = [$class, &$cycle];
unset($class, $cycle);

$ended = $suspension->suspend();

$this->assertTrue($ended);
}


public function testSuspensionResumptionWithQueueInGarbageCollectionNested(): void
{
$suspension = EventLoop::getSuspension();

$resumer = new class ($suspension) {
public function __construct(public Suspension $suspension)
{
}
public function __destruct()
{
$this->suspension->resume(true);
}
};

$class = new class ($resumer) {
public static ?object $staticReference = null;
public function __construct(object $resumer)
{
self::$staticReference = $resumer;
}
public function __destruct()
{
EventLoop::queue(function () {
$class = self::$staticReference;
$cycle = [$class, &$cycle];
unset($class, $cycle);

self::$staticReference = null;
});
}
};
$cycle = [$class, &$cycle];
unset($class, $resumer, $cycle);


$ended = $suspension->suspend();

$this->assertTrue($ended);
}

public function testEventLoopResumptionWithQueueInGarbageCollectionNested(): void
{
$suspension = EventLoop::getSuspension();

$resumer = new class ($suspension) {
public function __construct(public Suspension $suspension)
{
}
public function __destruct()
{
EventLoop::queue($this->suspension->resume(...), true);
}
};

$class = new class ($resumer) {
public static ?object $staticReference = null;
public function __construct(object $resumer)
{
self::$staticReference = $resumer;
}
public function __destruct()
{
EventLoop::queue(function () {
$class = self::$staticReference;
$cycle = [$class, &$cycle];
unset($class, $cycle);

self::$staticReference = null;
});
}
};
$cycle = [$class, &$cycle];
unset($class, $resumer, $cycle);


$ended = $suspension->suspend();

$this->assertTrue($ended);
}


public function testDelayWithNegativeDelay(): void
{
$this->expectException(\Error::class);
Expand Down
Loading