From 7123e1acfb0c1249855b9829e775998a4a2bbc58 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Fri, 6 Dec 2024 17:59:04 +0000 Subject: [PATCH 1/5] Allow queueing of callbacks in destructors --- src/EventLoop/Internal/AbstractDriver.php | 54 +++++---- src/EventLoop/Internal/DriverSuspension.php | 2 + test/EventLoopTest.php | 124 ++++++++++++++++++++ 3 files changed, 158 insertions(+), 22 deletions(-) diff --git a/src/EventLoop/Internal/AbstractDriver.php b/src/EventLoop/Internal/AbstractDriver.php index fc2e732..8a2e2cc 100644 --- a/src/EventLoop/Internal/AbstractDriver.php +++ b/src/EventLoop/Internal/AbstractDriver.php @@ -48,6 +48,7 @@ abstract class AbstractDriver implements Driver private readonly \Closure $interruptCallback; private readonly \Closure $queueCallback; + /** @var \Closure(): (?\Closure(): mixed) */ private readonly \Closure $runCallback; private readonly \stdClass $internalSuspensionMarker; @@ -87,13 +88,16 @@ public function __construct() /** @psalm-suppress InvalidArgument */ $this->interruptCallback = $this->setInterrupt(...); $this->queueCallback = $this->queue(...); - $this->runCallback = function () { - if ($this->fiber->isTerminated()) { - $this->createLoopFiber(); - } + $this->runCallback = + /** @return ?\Closure(): mixed */ + function (): ?\Closure { + if ($this->fiber->isTerminated()) { + $this->createLoopFiber(); + } - return $this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start(); - }; + // Returns a callback that returns the value of the {main} fiber, or null in case of deadlock. + return $this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start(); + }; } public function run(): void @@ -533,26 +537,32 @@ private function createLoopFiber(): void { $this->fiber = new \Fiber(function (): void { $this->stopped = false; + do { + // Invoke microtasks if we have some + $this->invokeCallbacks(); - // Invoke microtasks if we have some - $this->invokeCallbacks(); - - /** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */ - while (!$this->stopped) { - if ($this->interrupt) { - $this->invokeInterrupt(); - } + /** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */ + while (!$this->stopped) { + if ($this->interrupt) { + $this->invokeInterrupt(); + } - if ($this->isEmpty()) { - return; - } + if ($this->isEmpty()) { + while (\gc_collect_cycles()); + if (!$this->microtaskQueue->isEmpty() || !$this->callbackQueue->isEmpty() || $this->interrupt) { + continue 2; + } + return; + } - $previousIdle = $this->idle; - $this->idle = true; + $previousIdle = $this->idle; + $this->idle = true; - $this->tick($previousIdle); - $this->invokeCallbacks(); - } + $this->tick($previousIdle); + $this->invokeCallbacks(); + } + return; + } while (true); }); } diff --git a/src/EventLoop/Internal/DriverSuspension.php b/src/EventLoop/Internal/DriverSuspension.php index a3a7e29..90714af 100644 --- a/src/EventLoop/Internal/DriverSuspension.php +++ b/src/EventLoop/Internal/DriverSuspension.php @@ -28,6 +28,7 @@ final class DriverSuspension implements Suspension private bool $deadMain = false; /** + * @param \Closure(): (?\Closure(): mixed) $run * @param \WeakMap> $suspensions */ public function __construct( @@ -145,6 +146,7 @@ public function suspend(): mixed throw new \Error('Event loop terminated without resuming the current suspension (the cause is either a fiber deadlock, or an incorrectly unreferenced/canceled watcher):' . $info); } + \assert($result !== null); return $result(); } diff --git a/test/EventLoopTest.php b/test/EventLoopTest.php index 8f334f2..9aacf59 100644 --- a/test/EventLoopTest.php +++ b/test/EventLoopTest.php @@ -9,6 +9,130 @@ class EventLoopTest extends TestCase { + public function testSuspensionResumptionWithQueueInGarbageCollection(): void + { + $suspension = EventLoop::getSuspension(); + + $class = new class ($suspension) { + public function __construct(public Suspension $suspension) + { + } + public function __destruct() + { + $this->suspension->resume(true); + } + }; + $cycle = [$class, &$cycle]; + unset($class, $cycle); + + $ended = $suspension->suspend(); + + $this->assertTrue($ended); + } + + public function testEventLoopResumptionWithQueueInGarbageCollection(): void + { + $suspension = EventLoop::getSuspension(); + + $class = new class ($suspension) { + public function __construct(public Suspension $suspension) + { + } + public function __destruct() + { + EventLoop::queue($this->suspension->resume(...), true); + } + }; + $cycle = [$class, &$cycle]; + unset($class, $cycle); + + $ended = $suspension->suspend(); + + $this->assertTrue($ended); + } + + + public function testSuspensionResumptionWithQueueInGarbageCollectionNested(): void + { + $suspension = EventLoop::getSuspension(); + + $resumer = new class ($suspension) { + public function __construct(public Suspension $suspension) + { + } + public function __destruct() + { + $this->suspension->resume(true); + } + }; + + $class = new class ($resumer) { + public static ?object $staticReference = null; + public function __construct(object $resumer) + { + self::$staticReference = $resumer; + } + public function __destruct() + { + EventLoop::queue(function () { + $class = self::$staticReference; + $cycle = [$class, &$cycle]; + unset($class, $cycle); + + self::$staticReference = null; + }); + } + }; + $cycle = [$class, &$cycle]; + unset($class, $resumer, $cycle); + + + $ended = $suspension->suspend(); + + $this->assertTrue($ended); + } + + public function testEventLoopResumptionWithQueueInGarbageCollectionNested(): void + { + $suspension = EventLoop::getSuspension(); + + $resumer = new class ($suspension) { + public function __construct(public Suspension $suspension) + { + } + public function __destruct() + { + EventLoop::queue($this->suspension->resume(...), true); + } + }; + + $class = new class ($resumer) { + public static ?object $staticReference = null; + public function __construct(object $resumer) + { + self::$staticReference = $resumer; + } + public function __destruct() + { + EventLoop::queue(function () { + $class = self::$staticReference; + $cycle = [$class, &$cycle]; + unset($class, $cycle); + + self::$staticReference = null; + }); + } + }; + $cycle = [$class, &$cycle]; + unset($class, $resumer, $cycle); + + + $ended = $suspension->suspend(); + + $this->assertTrue($ended); + } + + public function testDelayWithNegativeDelay(): void { $this->expectException(\Error::class); From 38b195d12e770e43b40f1c021f7daeec645490b6 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Fri, 6 Dec 2024 19:45:10 +0100 Subject: [PATCH 2/5] Change approach --- src/EventLoop/Internal/AbstractDriver.php | 45 +++++++++------------ src/EventLoop/Internal/DriverSuspension.php | 6 ++- 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/src/EventLoop/Internal/AbstractDriver.php b/src/EventLoop/Internal/AbstractDriver.php index 8a2e2cc..e1d49f0 100644 --- a/src/EventLoop/Internal/AbstractDriver.php +++ b/src/EventLoop/Internal/AbstractDriver.php @@ -48,7 +48,7 @@ abstract class AbstractDriver implements Driver private readonly \Closure $interruptCallback; private readonly \Closure $queueCallback; - /** @var \Closure(): (?\Closure(): mixed) */ + /** @var \Closure(): ((\Closure(): mixed)|bool|null) */ private readonly \Closure $runCallback; private readonly \stdClass $internalSuspensionMarker; @@ -89,13 +89,12 @@ public function __construct() $this->interruptCallback = $this->setInterrupt(...); $this->queueCallback = $this->queue(...); $this->runCallback = - /** @return ?\Closure(): mixed */ - function (): ?\Closure { + /** @return (\Closure(): mixed)|bool|null */ + function (): \Closure|bool|null { if ($this->fiber->isTerminated()) { $this->createLoopFiber(); } - // Returns a callback that returns the value of the {main} fiber, or null in case of deadlock. return $this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start(); }; } @@ -537,32 +536,26 @@ private function createLoopFiber(): void { $this->fiber = new \Fiber(function (): void { $this->stopped = false; - do { - // Invoke microtasks if we have some - $this->invokeCallbacks(); - /** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */ - while (!$this->stopped) { - if ($this->interrupt) { - $this->invokeInterrupt(); - } - - if ($this->isEmpty()) { - while (\gc_collect_cycles()); - if (!$this->microtaskQueue->isEmpty() || !$this->callbackQueue->isEmpty() || $this->interrupt) { - continue 2; - } - return; - } + // Invoke microtasks if we have some + $this->invokeCallbacks(); - $previousIdle = $this->idle; - $this->idle = true; + /** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */ + while (!$this->stopped) { + if ($this->interrupt) { + $this->invokeInterrupt(); + } - $this->tick($previousIdle); - $this->invokeCallbacks(); + if ($this->isEmpty()) { + return; } - return; - } while (true); + + $previousIdle = $this->idle; + $this->idle = true; + + $this->tick($previousIdle); + $this->invokeCallbacks(); + } }); } diff --git a/src/EventLoop/Internal/DriverSuspension.php b/src/EventLoop/Internal/DriverSuspension.php index 90714af..d19ccbf 100644 --- a/src/EventLoop/Internal/DriverSuspension.php +++ b/src/EventLoop/Internal/DriverSuspension.php @@ -28,7 +28,7 @@ final class DriverSuspension implements Suspension private bool $deadMain = false; /** - * @param \Closure(): (?\Closure(): mixed) $run + * @param \Closure(): ((\Closure(): mixed)|bool|null) $run * @param \WeakMap> $suspensions */ public function __construct( @@ -117,6 +117,10 @@ public function suspend(): mixed // Awaiting from {main}. $result = ($this->run)(); + while ($this->pending && $result === false) { + + } + /** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */ if ($this->pending) { // This is now a dead {main} suspension. From d4b6aed391d31abbb25a2ffa32ee7f69ec671bc8 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Fri, 6 Dec 2024 19:54:11 +0100 Subject: [PATCH 3/5] Change approach --- src/EventLoop/Internal/AbstractDriver.php | 24 +++++++++++++-------- src/EventLoop/Internal/DriverSuspension.php | 9 +++++--- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/src/EventLoop/Internal/AbstractDriver.php b/src/EventLoop/Internal/AbstractDriver.php index e1d49f0..ded274f 100644 --- a/src/EventLoop/Internal/AbstractDriver.php +++ b/src/EventLoop/Internal/AbstractDriver.php @@ -48,7 +48,7 @@ abstract class AbstractDriver implements Driver private readonly \Closure $interruptCallback; private readonly \Closure $queueCallback; - /** @var \Closure(): ((\Closure(): mixed)|bool|null) */ + /** @var \Closure(): ((\Closure(): mixed)|bool) */ private readonly \Closure $runCallback; private readonly \stdClass $internalSuspensionMarker; @@ -89,13 +89,13 @@ public function __construct() $this->interruptCallback = $this->setInterrupt(...); $this->queueCallback = $this->queue(...); $this->runCallback = - /** @return (\Closure(): mixed)|bool|null */ - function (): \Closure|bool|null { + /** @return (\Closure(): mixed)|bool */ + function (): \Closure|bool { if ($this->fiber->isTerminated()) { $this->createLoopFiber(); } - return $this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start(); + return ($this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start()) ?? $this->fiber->getReturn(); }; } @@ -493,9 +493,11 @@ private function tick(bool $previousIdle): void $this->dispatch($blocking); } - private function invokeCallbacks(): void + private function invokeCallbacks(): bool { + $didWork = false; while (!$this->microtaskQueue->isEmpty() || !$this->callbackQueue->isEmpty()) { + $didWork = true; /** @noinspection PhpUnhandledExceptionInspection */ $yielded = $this->callbackFiber->isStarted() ? $this->callbackFiber->resume() @@ -509,6 +511,7 @@ private function invokeCallbacks(): void $this->invokeInterrupt(); } } + return $didWork; } /** @@ -534,28 +537,31 @@ private function invokeInterrupt(): void private function createLoopFiber(): void { - $this->fiber = new \Fiber(function (): void { + $this->fiber = new \Fiber(function (): bool { $this->stopped = false; // Invoke microtasks if we have some - $this->invokeCallbacks(); + $didWork = $this->invokeCallbacks(); /** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */ while (!$this->stopped) { if ($this->interrupt) { + $didWork = true; $this->invokeInterrupt(); } if ($this->isEmpty()) { - return; + return $didWork; } $previousIdle = $this->idle; $this->idle = true; $this->tick($previousIdle); - $this->invokeCallbacks(); + $didWork = $this->invokeCallbacks(); } + + return $didWork; }); } diff --git a/src/EventLoop/Internal/DriverSuspension.php b/src/EventLoop/Internal/DriverSuspension.php index d19ccbf..9a4dae6 100644 --- a/src/EventLoop/Internal/DriverSuspension.php +++ b/src/EventLoop/Internal/DriverSuspension.php @@ -28,7 +28,7 @@ final class DriverSuspension implements Suspension private bool $deadMain = false; /** - * @param \Closure(): ((\Closure(): mixed)|bool|null) $run + * @param \Closure(): ((\Closure(): mixed)|bool) $run * @param \WeakMap> $suspensions */ public function __construct( @@ -117,8 +117,11 @@ public function suspend(): mixed // Awaiting from {main}. $result = ($this->run)(); - while ($this->pending && $result === false) { - + if ($this->pending && \is_bool($result)) { + do { + while (\gc_collect_cycles()); + $result = ($this->run)(); + } while ($this->pending && $result === true); } /** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */ From 08d86fd3887a0bcd90c62def4164965fe836f2e0 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Fri, 6 Dec 2024 19:55:09 +0100 Subject: [PATCH 4/5] Change approach --- src/EventLoop/Internal/AbstractDriver.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/EventLoop/Internal/AbstractDriver.php b/src/EventLoop/Internal/AbstractDriver.php index ded274f..7a0382a 100644 --- a/src/EventLoop/Internal/AbstractDriver.php +++ b/src/EventLoop/Internal/AbstractDriver.php @@ -558,7 +558,7 @@ private function createLoopFiber(): void $this->idle = true; $this->tick($previousIdle); - $didWork = $this->invokeCallbacks(); + $didWork = $this->invokeCallbacks() || $didWork; } return $didWork; From 3a8fea6e54099250172687ba517db83ea701fac0 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sat, 7 Dec 2024 00:32:54 +0100 Subject: [PATCH 5/5] Fixes --- src/EventLoop/Internal/DriverSuspension.php | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/EventLoop/Internal/DriverSuspension.php b/src/EventLoop/Internal/DriverSuspension.php index 9a4dae6..c848654 100644 --- a/src/EventLoop/Internal/DriverSuspension.php +++ b/src/EventLoop/Internal/DriverSuspension.php @@ -117,10 +117,12 @@ public function suspend(): mixed // Awaiting from {main}. $result = ($this->run)(); + /** @var bool $this->pending */ if ($this->pending && \is_bool($result)) { do { while (\gc_collect_cycles()); $result = ($this->run)(); + /** @var bool $this->pending */ } while ($this->pending && $result === true); } @@ -132,7 +134,9 @@ public function suspend(): mixed // Unset suspension for {main} using queue closure. unset($this->suspensions[$this->queue]); - $result && $result(); // Unwrap any uncaught exceptions from the event loop + if ($result instanceof \Closure) { + $result(); + } // Unwrap any uncaught exceptions from the event loop \gc_collect_cycles(); // Collect any circular references before dumping pending suspensions. @@ -153,7 +157,7 @@ public function suspend(): mixed throw new \Error('Event loop terminated without resuming the current suspension (the cause is either a fiber deadlock, or an incorrectly unreferenced/canceled watcher):' . $info); } - \assert($result !== null); + \assert($result instanceof \Closure); return $result(); }