From 71f9bf34ecee032f24d650550aa795c98ccf8c0b Mon Sep 17 00:00:00 2001 From: Revolt Date: Sun, 5 Sep 2021 16:35:10 +0200 Subject: [PATCH] Initial commit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Revolt is the result of combining years of experience of amphp's and ReactPHP's event loop implementations. Co-authored-by: Aaron Piotrowski Co-authored-by: Cees-Jan Kiewiet Co-authored-by: Christian Lück Co-authored-by: Niklas Keller --- .editorconfig | 8 + .gitattributes | 17 + .github/CODEOWNERS | 1 + .github/workflows/ci.yml | 73 + .gitignore | 7 + .php-cs-fixer.dist.php | 88 + CHANGELOG.md | 1 + CONTRIBUTING.md | 41 + LICENSE | 25 + README.md | 55 + composer.json | 66 + docs/README.md | 117 ++ docs/api.md | 368 ++++ examples/benchmark-memory.php | 72 + examples/benchmark-ticks-delay.php | 19 + examples/benchmark-ticks.php | 13 + examples/benchmark-timers-delay.php | 19 + examples/benchmark-timers.php | 13 + examples/consume-stdin.php | 30 + examples/generate-yes.php | 41 + examples/http-client-async.php | 55 + examples/http-client-blocking.php | 33 + examples/http-client-suspension.php | 57 + examples/http-server.php | 36 + examples/invalid-callback-return.php | 22 + examples/stdin-timeout.php | 32 + examples/stdin.php | 27 + examples/ticks.php | 15 + examples/timers.php | 25 + phpunit.xml.dist | 18 + psalm.examples.xml | 16 + psalm.xml | 70 + src/EventLoop.php | 423 +++++ src/EventLoop/Driver.php | 284 +++ src/EventLoop/Driver/EvDriver.php | 226 +++ src/EventLoop/Driver/EventDriver.php | 247 +++ src/EventLoop/Driver/StreamSelectDriver.php | 325 ++++ src/EventLoop/Driver/TracingDriver.php | 250 +++ src/EventLoop/Driver/UvDriver.php | 269 +++ src/EventLoop/DriverFactory.php | 79 + src/EventLoop/Internal/AbstractDriver.php | 693 +++++++ src/EventLoop/Internal/DeferWatcher.php | 8 + src/EventLoop/Internal/SignalWatcher.php | 15 + src/EventLoop/Internal/StreamReadWatcher.php | 8 + src/EventLoop/Internal/StreamWatcher.php | 18 + src/EventLoop/Internal/StreamWriteWatcher.php | 8 + src/EventLoop/Internal/TimerQueue.php | 154 ++ src/EventLoop/Internal/TimerWatcher.php | 17 + src/EventLoop/Internal/Watcher.php | 44 + src/EventLoop/Internal/functions.php | 21 + src/EventLoop/InvalidCallbackError.php | 55 + src/EventLoop/InvalidWatcherError.php | 32 + src/EventLoop/Suspension.php | 111 ++ src/EventLoop/UnsupportedFeatureException.php | 12 + src/functions.php | 26 + stubs/Fiber.php | 115 ++ stubs/FiberError.php | 15 + stubs/FiberExit.php | 15 + stubs/ReflectionFiber.php | 58 + stubs/uv.php | 3 + test/Driver/DriverTest.php | 1612 +++++++++++++++++ test/Driver/EvDriverTest.php | 26 + test/Driver/EventDriverTest.php | 26 + test/Driver/StreamSelectDriverTest.php | 114 ++ test/Driver/TimerQueueTest.php | 68 + test/Driver/TracingDriverTest.php | 23 + test/Driver/UvDriverTest.php | 27 + test/EventLoopTest.php | 197 ++ tools/php-cs-fixer/composer.json | 5 + 69 files changed, 7109 insertions(+) create mode 100644 .editorconfig create mode 100644 .gitattributes create mode 100644 .github/CODEOWNERS create mode 100644 .github/workflows/ci.yml create mode 100644 .gitignore create mode 100644 .php-cs-fixer.dist.php create mode 100644 CHANGELOG.md create mode 100644 CONTRIBUTING.md create mode 100644 LICENSE create mode 100644 README.md create mode 100644 composer.json create mode 100644 docs/README.md create mode 100644 docs/api.md create mode 100644 examples/benchmark-memory.php create mode 100644 examples/benchmark-ticks-delay.php create mode 100644 examples/benchmark-ticks.php create mode 100644 examples/benchmark-timers-delay.php create mode 100644 examples/benchmark-timers.php create mode 100644 examples/consume-stdin.php create mode 100644 examples/generate-yes.php create mode 100644 examples/http-client-async.php create mode 100644 examples/http-client-blocking.php create mode 100644 examples/http-client-suspension.php create mode 100644 examples/http-server.php create mode 100644 examples/invalid-callback-return.php create mode 100644 examples/stdin-timeout.php create mode 100644 examples/stdin.php create mode 100644 examples/ticks.php create mode 100644 examples/timers.php create mode 100644 phpunit.xml.dist create mode 100644 psalm.examples.xml create mode 100644 psalm.xml create mode 100644 src/EventLoop.php create mode 100644 src/EventLoop/Driver.php create mode 100644 src/EventLoop/Driver/EvDriver.php create mode 100644 src/EventLoop/Driver/EventDriver.php create mode 100644 src/EventLoop/Driver/StreamSelectDriver.php create mode 100644 src/EventLoop/Driver/TracingDriver.php create mode 100644 src/EventLoop/Driver/UvDriver.php create mode 100644 src/EventLoop/DriverFactory.php create mode 100644 src/EventLoop/Internal/AbstractDriver.php create mode 100644 src/EventLoop/Internal/DeferWatcher.php create mode 100644 src/EventLoop/Internal/SignalWatcher.php create mode 100644 src/EventLoop/Internal/StreamReadWatcher.php create mode 100644 src/EventLoop/Internal/StreamWatcher.php create mode 100644 src/EventLoop/Internal/StreamWriteWatcher.php create mode 100644 src/EventLoop/Internal/TimerQueue.php create mode 100644 src/EventLoop/Internal/TimerWatcher.php create mode 100644 src/EventLoop/Internal/Watcher.php create mode 100644 src/EventLoop/Internal/functions.php create mode 100644 src/EventLoop/InvalidCallbackError.php create mode 100644 src/EventLoop/InvalidWatcherError.php create mode 100644 src/EventLoop/Suspension.php create mode 100644 src/EventLoop/UnsupportedFeatureException.php create mode 100644 src/functions.php create mode 100644 stubs/Fiber.php create mode 100644 stubs/FiberError.php create mode 100644 stubs/FiberExit.php create mode 100644 stubs/ReflectionFiber.php create mode 100644 stubs/uv.php create mode 100644 test/Driver/DriverTest.php create mode 100644 test/Driver/EvDriverTest.php create mode 100644 test/Driver/EventDriverTest.php create mode 100644 test/Driver/StreamSelectDriverTest.php create mode 100644 test/Driver/TimerQueueTest.php create mode 100644 test/Driver/TracingDriverTest.php create mode 100644 test/Driver/UvDriverTest.php create mode 100644 test/EventLoopTest.php create mode 100644 tools/php-cs-fixer/composer.json diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..131e7a2 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,8 @@ +root = true + +[*] +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true +indent_style = space +charset = utf-8 diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..402347f --- /dev/null +++ b/.gitattributes @@ -0,0 +1,17 @@ +.github export-ignore +docs export-ignore +examples export-ignore +stubs export-ignore +test export-ignore +tools export-ignore +.editorconfig export-ignore +.gitattributes export-ignore +.gitignore export-ignore +.php-cs-fixer.* export-ignore +.phpunit.* export-ignore +CHANGELOG.md export-ignore +CONTRIBUTING.md export-ignore +phpunit.xml.dist export-ignore +psalm.examples.xml export-ignore +psalm.xml export-ignore +README.md export-ignore diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..d13351f --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @revoltphp/event-loop-maintainers diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..3ffc4f4 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,73 @@ +name: Continuous Integration + +on: + - push + - pull_request + +jobs: + tests: + timeout-minutes: 5 + strategy: + fail-fast: false + matrix: + include: + - operating-system: 'ubuntu-latest' + php-version: '8.0' + composer-flags: '--ignore-platform-req=php' + + name: PHP ${{ matrix.php-version }} ${{ matrix.job-description }} + + runs-on: ${{ matrix.operating-system }} + + steps: + - name: Set git to use LF + run: | + git config --global core.autocrlf false + git config --global core.eol lf + + - name: Checkout code + uses: actions/checkout@v2 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php-version }} + extensions: fiber-amphp/ext-fiber@master + + - name: Get Composer cache directory + id: composer-cache + run: echo "::set-output name=dir::$(composer config cache-dir)" + + - name: Cache dependencies + uses: actions/cache@v2 + with: + path: ${{ steps.composer-cache.outputs.dir }} + key: composer-${{ runner.os }}-${{ matrix.php-version }}-${{ hashFiles('**/composer.*') }}-${{ matrix.composer-flags }} + restore-keys: | + composer-${{ runner.os }}-${{ matrix.php-version }}-${{ hashFiles('**/composer.*') }}- + composer-${{ runner.os }}-${{ matrix.php-version }}- + composer-${{ runner.os }}- + composer- + + - name: Install dependencies + uses: nick-invision/retry@v2 + with: + timeout_minutes: 5 + max_attempts: 5 + retry_wait_seconds: 30 + command: | + composer update --optimize-autoloader --no-interaction --no-progress ${{ matrix.composer-flags }} + composer info -D + + - name: Run tests + run: vendor/bin/phpunit ${{ matrix.phpunit-flags }} + + - name: Run Psalm + run: vendor/bin/psalm.phar --show-info=true + + - name: Run style fixer + env: + PHP_CS_FIXER_IGNORE_ENV: 1 + run: | + composer install --optimize-autoloader --no-interaction --no-progress --working-dir=tools/php-cs-fixer + tools/php-cs-fixer/vendor/bin/php-cs-fixer --diff --dry-run -v fix diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f43d0ee --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +build +composer.lock +composer.phar +coverage +phpunit.xml +vendor +*.cache diff --git a/.php-cs-fixer.dist.php b/.php-cs-fixer.dist.php new file mode 100644 index 0000000..0624dda --- /dev/null +++ b/.php-cs-fixer.dist.php @@ -0,0 +1,88 @@ +setRiskyAllowed(true); + $this->setLineEnding("\n"); + + $this->src = __DIR__ . '/src'; + } + + public function getRules(): array + { + return [ + "@PSR1" => true, + "@PSR2" => true, + "@PSR12" => true, + "braces" => [ + "allow_single_line_closure" => true, + ], + "array_syntax" => ["syntax" => "short"], + "cast_spaces" => true, + "combine_consecutive_unsets" => true, + "function_to_constant" => true, + "native_function_invocation" => [ + 'include' => [ + '@internal', + 'pcntl_async_signals', + 'pcntl_signal_dispatch', + 'pcntl_signal', + 'posix_kill', + 'uv_loop_new', + 'uv_poll_start', + 'uv_poll_stop', + 'uv_now', + 'uv_run', + 'uv_poll_init_socket', + 'uv_timer_init', + 'uv_timer_start', + 'uv_timer_stop', + 'uv_signal_init', + 'uv_signal_start', + 'uv_signal_stop', + 'uv_update_time', + 'uv_is_active', + ], + ], + "multiline_whitespace_before_semicolons" => true, + "no_unused_imports" => true, + "no_useless_else" => true, + "no_useless_return" => true, + "no_whitespace_before_comma_in_array" => true, + "no_whitespace_in_blank_line" => true, + "non_printable_character" => true, + "normalize_index_brace" => true, + "ordered_imports" => ['imports_order' => ['class', 'const', 'function']], + "php_unit_construct" => true, + "php_unit_dedicate_assert" => true, + "php_unit_fqcn_annotation" => true, + "phpdoc_summary" => true, + "phpdoc_types" => true, + "psr_autoloading" => ['dir' => $this->src], + "return_type_declaration" => ["space_before" => "none"], + "short_scalar_cast" => true, + "single_blank_line_before_namespace" => true, + "line_ending" => true, + ]; + } +} + +$config = new Config; +$config->getFinder() + ->in(__DIR__ . '/examples') + ->in(__DIR__ . '/src') + ->in(__DIR__ . '/test'); + +$config->setCacheFile(__DIR__ . '/.php-cs-fixer.cache'); + +return $config; diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..c1f6417 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1 @@ +## TBD diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..939e5d6 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,41 @@ +## Submitting useful bug reports + +Please search existing issues first to make sure this is not a duplicate. +Every issue report has a cost for the developers required to field it; be +respectful of others' time and ensure your report isn't spurious prior to +submission. Please adhere to [sound bug reporting principles](http://www.chiark.greenend.org.uk/~sgtatham/bugs.html). + +## Development ideology + +Truths which we believe to be self-evident: + +- **It's an asynchronous world.** Be wary of anything that undermines + async principles. + +- **The answer is not more options.** If you feel compelled to expose + new preferences to the user it's very possible you've made a wrong + turn somewhere. + +- **There are no power users.** The idea that some users "understand" + concepts better than others has proven to be, for the most part, false. + If anything, "power users" are more dangerous than the rest, and we + should avoid exposing dangerous functionality to them. + +## Code style + +The project adheres to the [PSR-2 style guide](https://github.com/php-fig/fig-standards/blob/master/accepted/PSR-2-coding-style-guide.md +). + +To apply code standards you can run `php-cs-fixer` with following composer command: + +```bash +composer code-style +``` + +## Running the tests + +Run the test suite from root directory: + +```bash +composer test +``` diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0b5ae43 --- /dev/null +++ b/LICENSE @@ -0,0 +1,25 @@ + +The MIT License (MIT) + +Copyright (c) 2021 Revolt (Aaron Piotrowski, Cees-Jan Kiewiet, Christian Lück, Niklas Keller, and contributors) +Copyright (c) 2015-2021 amphp (Daniel Lowrey, Aaron Piotrowski, Niklas Keller, Bob Weinand, and contributors) +Copyright (c) 2012-2021 ReactPHP (Christian Lück, Cees-Jan Kiewiet, Jan Sorgalla, Chris Boden, Igor Wiedler, and contributors) +Copyright (c) 2016 PHP Asynchronous Interoperability Group + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..047ea56 --- /dev/null +++ b/README.md @@ -0,0 +1,55 @@ +# Revolt + +Revolt is a rock-solid event loop for concurrent PHP applications. + +## Motivation + +Traditionally, PHP has a synchronous execution flow, doing one thing at a time. +If you query a database, you send the query and wait for the response from the database server in a blocking manner. +Once you have the response, you can start doing the next thing. + +Instead of sitting there and doing nothing while waiting, we could already send the next database query, or do an HTTP call to an API. +Making use of the time we usually spend on waiting for I/O can speed up the total execution time. + +A single scheduler – also called event loop – is required to allow for [cooperative multitasking](https://en.wikipedia.org/wiki/Cooperative_multitasking), which this package provides. + +## Installation + +This package can be installed as a [Composer](https://getcomposer.org/) dependency. + +```bash +composer require revolt/event-loop +``` + +This installs the basic building block for building concurrent applications in PHP. + +## Documentation + +Documentation can be found in the [`./docs`](./docs) directory. + +## Requirements + +This package requires at least PHP 8.0. To take advantage of [Fibers](https://wiki.php.net/rfc/fibers), either [`ext-fiber`](https://github.com/amphp/ext-fiber) or PHP 8.1+ is required. + +##### Optional Extensions + +Extensions are only needed if your application necessitates a high numbers of concurrent socket connections, usually this limit is configured up to 1024 file descriptors. + +- [`ev`](https://pecl.php.net/package/ev) +- [`event`](https://pecl.php.net/package/event) +- [`uv`](https://github.com/amphp/ext-uv) + +## Examples + +Examples can be found in the [`./examples`](./examples) directory of this repository. + +## Versioning + +`revolt/event-loop` follows the [semver](https://semver.org/) semantic versioning specification. + +## License + +The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. + +Revolt is the result of combining years of experience of amphp's and ReactPHP's +event loop implementations. diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..6bb5f4b --- /dev/null +++ b/composer.json @@ -0,0 +1,66 @@ +{ + "name": "revolt/event-loop", + "description": "Rock-solid event loop for concurrent PHP applications.", + "keywords": [ + "async", + "asynchronous", + "concurrency", + "non-blocking", + "event", + "event-loop" + ], + "license": "MIT", + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Cees-Jan Kiewiet", + "email": "ceesjank@gmail.com" + }, + { + "name": "Christian Lück", + "email": "christian@clue.engineering" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "require": { + "php": ">=8" + }, + "require-dev": { + "ext-json": "*", + "phpunit/phpunit": "^9", + "jetbrains/phpstorm-stubs": "^2019.3", + "psalm/phar": "^4.7" + }, + "autoload": { + "psr-4": { + "Revolt\\": "src" + }, + "files": [ + "src/functions.php", + "src/EventLoop/Internal/functions.php" + ] + }, + "autoload-dev": { + "psr-4": { + "Revolt\\EventLoop\\": "test" + } + }, + "support": { + "issues": "https://github.com/revoltphp/event-loop/issues" + }, + "extra": { + "branch-alias": { + "dev-main": "1.x-dev" + } + }, + "scripts": { + "test": "@php -dzend.assertions=1 -dassert.exception=1 ./vendor/bin/phpunit", + "code-style": "@php tools/php-cs-fixer/vendor/bin/php-cs-fixer fix" + } +} diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..cb18340 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,117 @@ +It may surprise people to learn that the PHP standard library already has everything we need to write event-driven and non-blocking applications. We only reach the limits of native PHP's functionality in this area when we ask it to poll thousands of file descriptors for IO activity at the same time. Even in this case, though, the fault is not with PHP but the underlying system `select()` call which is linear in its performance degradation as load increases. + +For performance that scales out to high volume we require more advanced capabilities currently found only in extensions. If you wish to, for example, service 10,000 simultaneous clients in an event loop backed socket server, you should use one of the event loop implementations based on a PHP extension. However, if you're using the package in a strictly local program for non-blocking concurrency, or you don't need to handle more than a few hundred simultaneous clients in a server application, the native PHP functionality should be adequate. + +## Global Accessor + +The package uses a global accessor for the event loop (scheduler) as there's only one event loop for each application. It doesn't make sense to have two loops running at the same time, as they would just have to schedule each other in a busy waiting manner to operate correctly. + +The event loop should be accessed through the methods provided by `Revolt\EventLoop`. On the first use of the accessor, it will automatically create the best available driver, see next section. + +`Revolt\EventLoop::setDriver()` can be used to set a custom driver. You can clear the scheduler in tests so each test runs with a fresh state to achieve test isolation. + +## Implementations + +The package offers different event loop implementations based on various backends. All implementations implement `Revolt\EventLoop\Driver`. Each behaves exactly the same way from an external API perspective. The main differences have to do with underlying performance characteristics. The current implementations are listed here: + +| Class | Extension | Repository | +| ------------------------- | ------------------------------------------------------ | ---------- | +| `Revolt\EventLoop\Driver\StreamSelectDriver` | – | - | +| `Revolt\EventLoop\Driver\EvDriver` | [`pecl/ev`](https://pecl.php.net/package/ev) | [`php-ev`](https://bitbucket.org/osmanov/pecl-ev) | +| `Revolt\EventLoop\Driver\EventDriver` | [`pecl/event`](https://pecl.php.net/package/event) | [`pecl-event`](https://bitbucket.org/osmanov/pecl-event) | +| `Revolt\EventLoop\Driver\UvDriver` | [`pecl/uv`](https://pecl.php.net/package/uv) | [`php-uv`](https://github.com/amphp/ext-uv) | + +It's not important to choose one implementation for your application. The package will automatically select the best available driver. It's perfectly fine to have one of the extensions in production while relying on the `StreamSelectDriver` locally for development. + +If you want to quickly switch implementations during development, e.g. for comparison or testing, you can set the `REVOLT_LOOP_DRIVER` environment variable to one of the classes. If you use a custom implementation, this only works if the implementation's constructor doesn't take any arguments. + +## Event Loop as Task Scheduler + +The first thing we need to understand to program effectively using an event loop is this: + +**The event loop is our task scheduler.** + +The event loop controls the program flow as long as it runs. Once we tell the event loop to run it will maintain control until the application errors out, has nothing left to do, or is explicitly stopped. + +Consider this very simple example: + +```php +resume(null); + + print "++ Executed after script ended" . PHP_EOL; +}); + +$suspension->suspend(); + +print '++ Script end' . PHP_EOL; +``` + +Upon execution of the above example you should see output like this: + +```plain +++ Executing watcher created by EventLoop::repeat() +++ Executing watcher created by EventLoop::repeat() +++ Executing watcher created by EventLoop::repeat() +++ Executing watcher created by EventLoop::repeat() +++ Executing watcher created by EventLoop::delay() +++ Script end +++ Executed after script ended +``` + +This output demonstrates that what happens inside the event loop is like its own separate program. Your script will not continue past the point of `$suspension->yield()` unless the suspension point is resumed with `$suspension->resume()` or `$suspension->throw()`. + +While an application can and often does take place entirely inside the confines of the event loop, we can also use the event loop to do things like the following example which imposes a short-lived timeout for interactive console input: + +```php +resume(null); +}); + +$timeoutWatcher = EventLoop::delay(5000, fn () => $suspension->resume(null)); + +$suspension->suspend(); + +EventLoop::cancel($readWatcher); +EventLoop::cancel($timeoutWatcher); +``` + +Obviously we could have simply used `fgets(STDIN)` synchronously in this example. We're just demonstrating that it's possible to move in and out of the event loop to mix synchronous tasks with non-blocking tasks as needed. + +Continue with the [Event Loop API](api.md). diff --git a/docs/api.md b/docs/api.md new file mode 100644 index 0000000..541e4f3 --- /dev/null +++ b/docs/api.md @@ -0,0 +1,368 @@ +This document describes the `Revolt\EventLoop\Loop` accessor. You might want to also read the documentation contained in +the source file, it's extensively documented and doesn't contain much distracting code. + +## `createSuspension()` + +The primary way an application interacts with the event loop is to schedule events for +execution. `EventLoop::createSuspension()` followed by `$suspension->suspend()` runs the event loop indefinitely until there +are no watchable timer events, IO streams or signals remaining to watch, or the suspension resumed. + +## Timers + +The event loop exposes several ways to schedule timers. Let's look at some details for each function. + +### `defer()` + +- Schedules a callback to execute in the next iteration of the event loop. +- This method guarantees a clean call stack to avoid starvation of other events in the current iteration of the loop. + A `defer` callback is *always* executed in the next tick of the event loop. +- After an `defer` timer executes, it is automatically garbage collected by the event loop so there is no need + for applications to manually cancel the associated timer. +- Like all watchers, `defer` timers may be disabled and re-enabled. If you disable this watcher between the time you + schedule it and the time that it actually runs the event loop *will not* be able to garbage collect it until it + executes. Therefore, you must manually cancel a `defer` watcher yourself if it never actually executes to free any + associated resources. + +**Example** + +```php + This is an advanced low-level API. Most users should use a stream abstraction instead. + +Watchers registered via `EventLoop::onReadable()` trigger their callbacks in the following situations: + +- When data is available to read on the stream under observation +- When the stream is at EOF (for sockets, this means the connection is broken) + +A common usage pattern for reacting to readable data looks something like this example: + +```php + You should always read a multiple of the configured chunk size (default: 8192), otherwise your code might not work as expected with loop backends other than `stream_select()`, see [amphp/amp#65](https://github.com/amphp/amp/issues/65) for more information. + +### `onWritable()` + +> This is an advanced low-level API. Most users should use a stream abstraction instead. + +- Streams are essentially *"always"* writable. The only time they aren't is when their respective write buffers are + full. + +A common usage pattern for reacting to writability involves initializing a writability watcher without enabling it when +a client first connects to a server. Once incomplete writes occur we're then able to "unpause" the write watcher +using `EventLoop::enable()` until data is fully sent without having to create and cancel new watcher resources on the same +stream multiple times. + +## Pausing, Resuming and Canceling Watchers + +All watchers, regardless of type, can be temporarily disabled and enabled in addition to being cleared +via `EventLoop::cancel()`. This allows for advanced capabilities such as disabling the acceptance of new socket clients in +server applications when simultaneity limits are reached. In general, the performance characteristics of watcher reuse +via pause/resume are favorable by comparison to repeatedly canceling and re-registering watchers. + +### `disable()` + +A simple disable example: + +```php += 3) { + EventLoop::cancel($watcherId); // <-- cancel myself! + } +}); + +EventLoop::run(); +``` + +It is also always safe to cancel a watcher from multiple places. A double-cancel will simply be ignored. + +### An Important Note on Writability + +Because streams are essentially *"always"* writable you should only enable writability watchers while you have data to +send. If you leave these watchers enabled when your application doesn't have anything to write the watcher will trigger +endlessly until disabled or canceled. This will max out your CPU. If you're seeing inexplicably high CPU usage in your +application it's a good bet you've got a writability watcher that you failed to disable or cancel after you were +finished with it. + +A standard pattern in this area is to initialize writability watchers in a disabled state before subsequently enabling +them at a later time as shown here: + +```php +stop(); + }); +} + +EventLoop::repeat(1, function () use (&$runs) { + $runs++; + + EventLoop::repeat(1_000, function (string $watcher) { + EventLoop::cancel($watcher); + }); +}); + +EventLoop::repeat($r, function () use (&$runs) { + $kmem = \round(\memory_get_usage() / 1024); + $kmemReal = \round(\memory_get_usage(true) / 1024); + echo "Runs:\t\t\t$runs\n"; + echo "Memory (internal):\t$kmem KiB\n"; + echo "Memory (real):\t\t$kmemReal KiB\n"; + echo \str_repeat('-', 50), "\n"; +}); + +echo "PHP Version:\t\t", PHP_VERSION, "\n"; +echo "Loop\t\t\t", \get_class(EventLoop::getDriver()), "\n"; +echo "Time\t\t\t", \date('r'), "\n"; + +echo \str_repeat('-', 50), "\n"; + +$beginTime = \time(); +EventLoop::run(); +$endTime = \time(); +$timeTaken = $endTime - $beginTime; + +echo "PHP Version:\t\t", PHP_VERSION, "\n"; +echo "Loop\t\t\t", \get_class(EventLoop::getDriver()), "\n"; +echo "Time\t\t\t", \date('r'), "\n"; +echo "Time taken\t\t", $timeTaken, " seconds\n"; +if ($timeTaken > 0) { + echo "Runs per second\t\t", \round($runs / $timeTaken), "\n"; +} diff --git a/examples/benchmark-ticks-delay.php b/examples/benchmark-ticks-delay.php new file mode 100644 index 0000000..fc8f131 --- /dev/null +++ b/examples/benchmark-ticks-delay.php @@ -0,0 +1,19 @@ + 0) { + --$ticks; + EventLoop::defer($tick); + } else { + echo 'done'; + } +}; + +$tick(); + +EventLoop::run(); diff --git a/examples/benchmark-ticks.php b/examples/benchmark-ticks.php new file mode 100644 index 0000000..fc066dd --- /dev/null +++ b/examples/benchmark-ticks.php @@ -0,0 +1,13 @@ + 0) { + --$ticks; + EventLoop::delay(0, $tick); + } else { + echo 'done'; + } +}; + +$tick(); + +EventLoop::run(); diff --git a/examples/benchmark-timers.php b/examples/benchmark-timers.php new file mode 100644 index 0000000..eb3b1fd --- /dev/null +++ b/examples/benchmark-timers.php @@ -0,0 +1,13 @@ + closed + if ($r === 0) { + EventLoop::cancel($watcher); + \stream_set_blocking($stdout, true); + \fclose($stdout); + \fwrite(STDERR, 'Stopped because STDOUT closed' . PHP_EOL); + + return; + } + + // implement a very simple ring buffer, unless everything has been written at once: + // everything written in this iteration will be appended for next iteration + if (isset($data[$r])) { + $data = \substr($data, $r) . \substr($data, 0, $r); + } +}); + +EventLoop::run(); diff --git a/examples/http-client-async.php b/examples/http-client-async.php new file mode 100644 index 0000000..b44d2bf --- /dev/null +++ b/examples/http-client-async.php @@ -0,0 +1,55 @@ + $suspension->resume(null)); + $suspension->suspend(); + EventLoop::cancel($watcher); + + // send HTTP request + \fwrite($stream, "GET " . $parsedUrl['path'] . " HTTP/1.1\r\nHost: " . $parsedUrl['host'] . "\r\nConnection: close\r\n\r\n"); + + $buffer = ''; + + // wait for HTTP response + $watcher = EventLoop::onReadable($stream, fn () => $suspension->resume(null)); + + do { + $suspension->suspend(); + $chunk = \fread($stream, 64 * 1024); + $buffer .= $chunk; + } while ($chunk !== ''); + + EventLoop::cancel($watcher); + \fclose($stream); + + return $buffer; +} + +echo fetch('http://www.google.com/'); diff --git a/examples/http-server.php b/examples/http-server.php new file mode 100644 index 0000000..fa1ab43 --- /dev/null +++ b/examples/http-server.php @@ -0,0 +1,36 @@ +resume(null); + + return new \stdClass(); +}); + +$suspension->suspend(); diff --git a/examples/stdin-timeout.php b/examples/stdin-timeout.php new file mode 100644 index 0000000..9c3e60a --- /dev/null +++ b/examples/stdin-timeout.php @@ -0,0 +1,32 @@ +#!/usr/bin/env php +resume(null); +}); + +$timeoutWatcher = EventLoop::delay(5000, fn () => $suspension->resume(null)); + +$suspension->suspend(); + +EventLoop::cancel($readWatcher); +EventLoop::cancel($timeoutWatcher); diff --git a/examples/stdin.php b/examples/stdin.php new file mode 100644 index 0000000..2b05091 --- /dev/null +++ b/examples/stdin.php @@ -0,0 +1,27 @@ +#!/usr/bin/env php +resume(null); +}); + +$suspension->suspend(); diff --git a/examples/ticks.php b/examples/ticks.php new file mode 100644 index 0000000..bafb655 --- /dev/null +++ b/examples/ticks.php @@ -0,0 +1,15 @@ +resume(null); + + print "++ Executed after script ended" . PHP_EOL; +}); + +$suspension->suspend(); + +print '++ Script end' . PHP_EOL; diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..6345e57 --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,18 @@ + + + + + src + + + + + test + + + diff --git a/psalm.examples.xml b/psalm.examples.xml new file mode 100644 index 0000000..2ac9d74 --- /dev/null +++ b/psalm.examples.xml @@ -0,0 +1,16 @@ + + + + + + + + + diff --git a/psalm.xml b/psalm.xml new file mode 100644 index 0000000..75c0788 --- /dev/null +++ b/psalm.xml @@ -0,0 +1,70 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/EventLoop.php b/src/EventLoop.php new file mode 100644 index 0000000..d2dfbab --- /dev/null +++ b/src/EventLoop.php @@ -0,0 +1,423 @@ +isRunning()) { + throw new \Error("Can't swap the event loop driver while the driver is running"); + } + + self::$fiber = null; + + try { + /** @psalm-suppress InternalClass */ + self::$driver = new class () extends AbstractDriver { + protected function activate(array $watchers): void + { + throw new \Error("Can't activate watcher during garbage collection."); + } + + protected function dispatch(bool $blocking): void + { + throw new \Error("Can't dispatch during garbage collection."); + } + + protected function deactivate(Watcher $watcher): void + { + // do nothing + } + + public function getHandle(): mixed + { + return null; + } + + protected function now(): float + { + return now(); + } + }; + + \gc_collect_cycles(); + } finally { + self::$driver = $driver; + } + } + + /** + * Queue a microtask. + * + * The queued callable MUST be executed immediately once the event loop gains control. Order of queueing MUST be + * preserved when executing the callbacks. Recursive scheduling can thus result in infinite loops, use with care. + * + * Does NOT create a watcher, thus CAN NOT be marked as disabled or unreferenced. Use {@see EventLoop::defer()} if + * you need these features. + * + * @param callable $callback The callback to queue. + * @param mixed ...$args The callback arguments. + */ + public static function queue(callable $callback, mixed ...$args): void + { + self::getDriver()->queue($callback, ...$args); + } + + /** + * Defer the execution of a callback. + * + * The deferred callable MUST be executed before any other type of watcher in a tick. Order of enabling MUST be + * preserved when executing the callbacks. + * + * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) + * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param callable(string) $callback The callback to defer. The `$watcherId` will be + * invalidated before the callback invocation. + * + * @return string An unique identifier that can be used to cancel, enable or disable the watcher. + */ + public static function defer(callable $callback): string + { + return self::getDriver()->defer($callback); + } + + /** + * Delay the execution of a callback. + * + * The delay is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be determined by which + * timers expire first, but timers with the same expiration time MAY be executed in any order. + * + * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) + * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param float $delay The amount of time, in seconds, to delay the execution for. + * @param callable(string) $callback The callback to delay. The `$watcherId` will be invalidated before + * the callback invocation. + * + * @return string An unique identifier that can be used to cancel, enable or disable the watcher. + */ + public static function delay(float $delay, callable $callback): string + { + return self::getDriver()->delay($delay, $callback); + } + + /** + * Repeatedly execute a callback. + * + * The interval between executions is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be + * determined by which timers expire first, but timers with the same expiration time MAY be executed in any order. + * The first execution is scheduled after the first interval period. + * + * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) + * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param float $interval The time interval, in seconds, to wait between executions. + * @param callable(string) $callback The callback to repeat. + * + * @return string An unique identifier that can be used to cancel, enable or disable the watcher. + */ + public static function repeat(float $interval, callable $callback): string + { + return self::getDriver()->repeat($interval, $callback); + } + + /** + * Execute a callback when a stream resource becomes readable or is closed for reading. + * + * Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the + * watcher when closing the resource locally. Drivers MAY choose to notify the user if there are watchers on invalid + * resources, but are not required to, due to the high performance impact. Watchers on closed resources are + * therefore undefined behavior. + * + * Multiple watchers on the same stream MAY be executed in any order. + * + * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) + * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param resource|object $stream The stream to monitor. + * @param callable(string, resource|object) $callback The callback to execute. + * + * @return string An unique identifier that can be used to cancel, enable or disable the watcher. + */ + public static function onReadable(mixed $stream, callable $callback): string + { + return self::getDriver()->onReadable($stream, $callback); + } + + /** + * Execute a callback when a stream resource becomes writable or is closed for writing. + * + * Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the + * watcher when closing the resource locally. Drivers MAY choose to notify the user if there are watchers on invalid + * resources, but are not required to, due to the high performance impact. Watchers on closed resources are + * therefore undefined behavior. + * + * Multiple watchers on the same stream MAY be executed in any order. + * + * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) + * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param resource|object $stream The stream to monitor. + * @param callable(string, resource|object) $callback The callback to execute. + * + * @return string An unique identifier that can be used to cancel, enable or disable the watcher. + */ + public static function onWritable(mixed $stream, callable $callback): string + { + return self::getDriver()->onWritable($stream, $callback); + } + + /** + * Execute a callback when a signal is received. + * + * Warning: Installing the same signal on different instances of this interface is deemed undefined behavior. + * Implementations MAY try to detect this, if possible, but are not required to. This is due to technical + * limitations of the signals being registered globally per process. + * + * Multiple watchers on the same signal MAY be executed in any order. + * + * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) + * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param int $signo The signal number to monitor. + * @param callable(string, int) $callback The callback to execute. + * + * @return string An unique identifier that can be used to cancel, enable or disable the watcher. + * + * @throws UnsupportedFeatureException If signal handling is not supported. + */ + public static function onSignal(int $signo, callable $callback): string + { + return self::getDriver()->onSignal($signo, $callback); + } + + /** + * Enable a watcher to be active starting in the next tick. + * + * Watchers MUST immediately be marked as enabled, but only be activated (i.e. callbacks can be called) right before + * the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param string $watcherId The watcher identifier. + * + * @return string The watcher identifier. + * + * @throws InvalidWatcherError If the watcher identifier is invalid. + */ + public static function enable(string $watcherId): string + { + return self::getDriver()->enable($watcherId); + } + + /** + * Disable a watcher immediately. + * + * A watcher MUST be disabled immediately, e.g. if a defer watcher disables a later defer watcher, the second defer + * watcher isn't executed in this tick. + * + * Disabling a watcher MUST NOT invalidate the watcher. Calling this function MUST NOT fail, even if passed an + * invalid watcher. + * + * @param string $watcherId The watcher identifier. + * + * @return string The watcher identifier. + */ + public static function disable(string $watcherId): string + { + return self::getDriver()->disable($watcherId); + } + + /** + * Cancel a watcher. + * + * This will detach the event loop from all resources that are associated to the watcher. After this operation the + * watcher is permanently invalid. Calling this function MUST NOT fail, even if passed an invalid watcher. + * + * @param string $watcherId The watcher identifier. + */ + public static function cancel(string $watcherId): void + { + self::getDriver()->cancel($watcherId); + } + + /** + * Reference a watcher. + * + * This will keep the event loop alive whilst the watcher is still being monitored. Watchers have this state by + * default. + * + * @param string $watcherId The watcher identifier. + * + * @return string The watcher identifier. + * + * @throws InvalidWatcherError If the watcher identifier is invalid. + */ + public static function reference(string $watcherId): string + { + return self::getDriver()->reference($watcherId); + } + + /** + * Unreference a watcher. + * + * The event loop should exit the run method when only unreferenced watchers are still being monitored. Watchers + * are all referenced by default. + * + * @param string $watcherId The watcher identifier. + * + * @return string The watcher identifier. + */ + public static function unreference(string $watcherId): string + { + return self::getDriver()->unreference($watcherId); + } + + /** + * Set a callback to be executed when an error occurs. + * + * The callback receives the error as the first and only parameter. The return value of the callback gets ignored. + * If it can't handle the error, it MUST throw the error. Errors thrown by the callback or during its invocation + * MUST be thrown into the `run` loop and stop the driver. + * + * Subsequent calls to this method will overwrite the previous handler. + * + * @param callable(\Throwable)|null $callback The callback to execute. `null` will clear the + * current handler. + * + * @return callable(\Throwable)|null The previous handler, `null` if there was none. + */ + public static function setErrorHandler(callable $callback = null): ?callable + { + return self::getDriver()->setErrorHandler($callback); + } + + /** + * Retrieve an associative array of information about the event loop driver. + * + * The returned array MUST contain the following data describing the driver's currently registered watchers: + * + * [ + * "defer" => ["enabled" => int, "disabled" => int], + * "delay" => ["enabled" => int, "disabled" => int], + * "repeat" => ["enabled" => int, "disabled" => int], + * "on_readable" => ["enabled" => int, "disabled" => int], + * "on_writable" => ["enabled" => int, "disabled" => int], + * "on_signal" => ["enabled" => int, "disabled" => int], + * "enabled_watchers" => ["referenced" => int, "unreferenced" => int], + * "running" => bool + * ]; + * + * Implementations MAY optionally add more information in the array but at minimum the above `key => value` format + * MUST always be provided. + * + * @return array Statistics about the loop in the described format. + */ + public static function getInfo(): array + { + return self::getDriver()->getInfo(); + } + + /** + * Retrieve the event loop driver that is in scope. + * + * @return Driver + */ + public static function getDriver(): Driver + { + /** @psalm-suppress RedundantPropertyInitializationCheck, RedundantCondition */ + if (!isset(self::$driver)) { + self::setDriver((new DriverFactory())->create()); + } + + return self::$driver; + } + + /** + * Create an object used to suspend and resume execution, either within a fiber or from {main}. + * + * @return Suspension + */ + public static function createSuspension(): Suspension + { + /** @psalm-suppress RedundantPropertyInitializationCheck */ + if (!self::$fiber || self::$fiber->isTerminated()) { + if (!\class_exists(\Fiber::class, false)) { + throw new \Error("Fibers required to create loop suspensions"); + } + + self::$fiber = self::createFiber(); + } + + return new Suspension(self::getDriver(), self::$fiber); + } + + /** + * Run the event loop. This function may only be called from {main}, that is, not within a fiber. + * + * This method will not return until the event loop contains no pending, referenced watchers. + */ + public static function run(): void + { + if (!\class_exists(\Fiber::class, false)) { + if (self::getDriver()->isRunning()) { + throw new \Error("The loop is already running"); + } + + self::getDriver()->run(); + return; + } + + if (\Fiber::getCurrent()) { + throw new \Error(\sprintf("Can't call %s() within a fiber (i.e., outside of {main})", __METHOD__)); + } + + if (!self::$fiber || self::$fiber->isTerminated()) { + self::$fiber = self::createFiber(); + } + + if (self::$fiber->isStarted()) { + self::$fiber->resume(); + } else { + self::$fiber->start(); + } + } + + /** + * Creates a fiber to run the active driver instance using {@see Driver::run()}. + * + * @return \Fiber Fiber used to run the event loop. + */ + private static function createFiber(): \Fiber + { + return new \Fiber([self::getDriver(), 'run']); + } + + /** + * Disable construction as this is a static class. + */ + private function __construct() + { + // intentionally left blank + } +} diff --git a/src/EventLoop/Driver.php b/src/EventLoop/Driver.php new file mode 100644 index 0000000..9b59234 --- /dev/null +++ b/src/EventLoop/Driver.php @@ -0,0 +1,284 @@ + ["enabled" => int, "disabled" => int], + * "delay" => ["enabled" => int, "disabled" => int], + * "repeat" => ["enabled" => int, "disabled" => int], + * "on_readable" => ["enabled" => int, "disabled" => int], + * "on_writable" => ["enabled" => int, "disabled" => int], + * "on_signal" => ["enabled" => int, "disabled" => int], + * "enabled_watchers" => ["referenced" => int, "unreferenced" => int], + * ]; + * + * Implementations MAY optionally add more information in the array but at minimum the above `key => value` format + * MUST always be provided. + * + * @return array Statistics about the loop in the described format. + */ + public function getInfo(): array; +} diff --git a/src/EventLoop/Driver/EvDriver.php b/src/EventLoop/Driver/EvDriver.php new file mode 100644 index 0000000..fb0164a --- /dev/null +++ b/src/EventLoop/Driver/EvDriver.php @@ -0,0 +1,226 @@ +handle = new \EvLoop(); + + if (self::$activeSignals === null) { + self::$activeSignals = &$this->signals; + } + + $this->ioCallback = function (\EvIO $event): void { + /** @var StreamWatcher $watcher */ + $watcher = $event->data; + + $this->invokeCallback($watcher); + }; + + $this->timerCallback = function (\EvTimer $event): void { + /** @var TimerWatcher $watcher */ + $watcher = $event->data; + + if (!$watcher->repeat) { + $this->cancel($watcher->id); + } else { + // Disable and re-enable so it's not executed repeatedly in the same tick + // See https://github.com/amphp/amp/issues/131 + $this->disable($watcher->id); + $this->enable($watcher->id); + } + + $this->invokeCallback($watcher); + }; + + $this->signalCallback = function (\EvSignal $event): void { + /** @var SignalWatcher $watcher */ + $watcher = $event->data; + + $this->invokeCallback($watcher); + }; + } + + /** + * {@inheritdoc} + */ + public function cancel(string $watcherId): void + { + parent::cancel($watcherId); + unset($this->events[$watcherId]); + } + + public function __destruct() + { + foreach ($this->events as $event) { + /** @psalm-suppress all */ + if ($event !== null) { // Events may have been nulled in extension depending on destruct order. + $event->stop(); + } + } + + // We need to clear all references to events manually, see + // https://bitbucket.org/osmanov/pecl-ev/issues/31/segfault-in-ev_timer_stop + $this->events = []; + } + + /** + * {@inheritdoc} + */ + public function run(): void + { + $active = self::$activeSignals; + + \assert($active !== null); + + foreach ($active as $event) { + $event->stop(); + } + + self::$activeSignals = &$this->signals; + + foreach ($this->signals as $event) { + $event->start(); + } + + try { + parent::run(); + } finally { + foreach ($this->signals as $event) { + $event->stop(); + } + + self::$activeSignals = &$active; + + foreach ($active as $event) { + $event->start(); + } + } + } + + /** + * {@inheritdoc} + */ + public function stop(): void + { + $this->handle->stop(); + parent::stop(); + } + + /** + * {@inheritdoc} + */ + public function getHandle(): \EvLoop + { + return $this->handle; + } + + protected function now(): float + { + return (float) \hrtime(true) / 1_000_000_000; + } + + /** + * {@inheritdoc} + */ + protected function dispatch(bool $blocking): void + { + $this->handle->run($blocking ? \Ev::RUN_ONCE : \Ev::RUN_ONCE | \Ev::RUN_NOWAIT); + } + + /** + * {@inheritdoc} + */ + protected function activate(array $watchers): void + { + $this->handle->nowUpdate(); + $now = $this->now(); + + foreach ($watchers as $watcher) { + if (!isset($this->events[$id = $watcher->id])) { + if ($watcher instanceof StreamReadWatcher) { + \assert(\is_resource($watcher->stream)); + + $this->events[$id] = $this->handle->io($watcher->stream, \Ev::READ, $this->ioCallback, $watcher); + } elseif ($watcher instanceof StreamWriteWatcher) { + \assert(\is_resource($watcher->stream)); + + $this->events[$id] = $this->handle->io( + $watcher->stream, + \Ev::WRITE, + $this->ioCallback, + $watcher + ); + } elseif ($watcher instanceof TimerWatcher) { + $interval = $watcher->interval; + $this->events[$id] = $this->handle->timer( + \max(0, ($watcher->expiration - $now)), + $watcher->repeat ? $interval : 0, + $this->timerCallback, + $watcher + ); + } elseif ($watcher instanceof SignalWatcher) { + $this->events[$id] = $this->handle->signal($watcher->signal, $this->signalCallback, $watcher); + } else { + // @codeCoverageIgnoreStart + throw new \Error("Unknown watcher type: " . \get_class($watcher)); + // @codeCoverageIgnoreEnd + } + } else { + $this->events[$id]->start(); + } + + if ($watcher instanceof SignalWatcher) { + /** @psalm-suppress PropertyTypeCoercion */ + $this->signals[$id] = $this->events[$id]; + } + } + } + + protected function deactivate(Watcher $watcher): void + { + if (isset($this->events[$id = $watcher->id])) { + $this->events[$id]->stop(); + + if ($watcher instanceof SignalWatcher) { + unset($this->signals[$id]); + } + } + } +} diff --git a/src/EventLoop/Driver/EventDriver.php b/src/EventLoop/Driver/EventDriver.php new file mode 100644 index 0000000..0c51f91 --- /dev/null +++ b/src/EventLoop/Driver/EventDriver.php @@ -0,0 +1,247 @@ +handle = new \EventBase(); + + if (self::$activeSignals === null) { + self::$activeSignals = &$this->signals; + } + + $this->ioCallback = function ($resource, $what, StreamWatcher $watcher): void { + \assert(\is_resource($watcher->stream)); + + $this->invokeCallback($watcher); + }; + + $this->timerCallback = function ($resource, $what, TimerWatcher $watcher): void { + if ($watcher->repeat) { + $this->events[$watcher->id]->add($watcher->interval); + } else { + $this->cancel($watcher->id); + } + + $this->invokeCallback($watcher); + }; + + $this->signalCallback = function ($signo, $what, SignalWatcher $watcher): void { + $this->invokeCallback($watcher); + }; + } + + /** + * {@inheritdoc} + */ + public function cancel(string $watcherId): void + { + parent::cancel($watcherId); + + if (isset($this->events[$watcherId])) { + $this->events[$watcherId]->free(); + unset($this->events[$watcherId]); + } + } + + /** + * @codeCoverageIgnore + */ + public function __destruct() + { + foreach ($this->events as $event) { + if ($event !== null) { // Events may have been nulled in extension depending on destruct order. + $event->free(); + } + } + + // Unset here, otherwise $event->del() fails with a warning, because __destruct order isn't defined. + // See https://github.com/amphp/amp/issues/159. + $this->events = []; + + // Manually free the loop handle to fully release loop resources. + // See https://github.com/amphp/amp/issues/177. + /** @psalm-suppress RedundantPropertyInitializationCheck */ + if (isset($this->handle)) { + $this->handle->free(); + unset($this->handle); + } + } + + /** + * {@inheritdoc} + */ + public function run(): void + { + $active = self::$activeSignals; + + \assert($active !== null); + + foreach ($active as $event) { + $event->del(); + } + + self::$activeSignals = &$this->signals; + + foreach ($this->signals as $event) { + /** @psalm-suppress TooFewArguments https://github.com/JetBrains/phpstorm-stubs/pull/763 */ + $event->add(); + } + + try { + parent::run(); + } finally { + foreach ($this->signals as $event) { + $event->del(); + } + + self::$activeSignals = &$active; + + foreach ($active as $event) { + /** @psalm-suppress TooFewArguments https://github.com/JetBrains/phpstorm-stubs/pull/763 */ + $event->add(); + } + } + } + + /** + * {@inheritdoc} + */ + public function stop(): void + { + $this->handle->stop(); + parent::stop(); + } + + /** + * {@inheritdoc} + */ + public function getHandle(): \EventBase + { + return $this->handle; + } + + protected function now(): float + { + return (float) \hrtime(true) / 1_000_000_000; + } + + /** + * {@inheritdoc} + */ + protected function dispatch(bool $blocking): void + { + $this->handle->loop($blocking ? \EventBase::LOOP_ONCE : \EventBase::LOOP_ONCE | \EventBase::LOOP_NONBLOCK); + } + + /** + * {@inheritdoc} + */ + protected function activate(array $watchers): void + { + $now = $this->now(); + + foreach ($watchers as $watcher) { + if (!isset($this->events[$id = $watcher->id])) { + if ($watcher instanceof StreamReadWatcher) { + \assert(\is_resource($watcher->stream)); + + $this->events[$id] = new \Event( + $this->handle, + $watcher->stream, + \Event::READ | \Event::PERSIST, + $this->ioCallback, + $watcher + ); + } elseif ($watcher instanceof StreamWriteWatcher) { + \assert(\is_resource($watcher->stream)); + + $this->events[$id] = new \Event( + $this->handle, + $watcher->stream, + \Event::WRITE | \Event::PERSIST, + $this->ioCallback, + $watcher + ); + } elseif ($watcher instanceof TimerWatcher) { + $this->events[$id] = new \Event( + $this->handle, + -1, + \Event::TIMEOUT, + $this->timerCallback, + $watcher + ); + } elseif ($watcher instanceof SignalWatcher) { + $this->events[$id] = new \Event( + $this->handle, + $watcher->signal, + \Event::SIGNAL | \Event::PERSIST, + $this->signalCallback, + $watcher + ); + } else { + // @codeCoverageIgnoreStart + throw new \Error("Unknown watcher type"); + // @codeCoverageIgnoreEnd + } + } + + if ($watcher instanceof TimerWatcher) { + $interval = \max(0, $watcher->expiration - $now); + $this->events[$id]->add($interval > 0 ? $interval : 0); + } elseif ($watcher instanceof SignalWatcher) { + $this->signals[$id] = $this->events[$id]; + /** @psalm-suppress TooFewArguments https://github.com/JetBrains/phpstorm-stubs/pull/763 */ + $this->events[$id]->add(); + } else { + /** @psalm-suppress TooFewArguments https://github.com/JetBrains/phpstorm-stubs/pull/763 */ + $this->events[$id]->add(); + } + } + } + + /** + * {@inheritdoc} + */ + protected function deactivate(Watcher $watcher): void + { + if (isset($this->events[$id = $watcher->id])) { + $this->events[$id]->del(); + + if ($watcher instanceof SignalWatcher) { + unset($this->signals[$id]); + } + } + } +} diff --git a/src/EventLoop/Driver/StreamSelectDriver.php b/src/EventLoop/Driver/StreamSelectDriver.php new file mode 100644 index 0000000..8a91628 --- /dev/null +++ b/src/EventLoop/Driver/StreamSelectDriver.php @@ -0,0 +1,325 @@ +timerQueue = new TimerQueue(); + $this->signalHandling = \extension_loaded("pcntl"); + + $this->streamSelectErrorHandler = function ($errno, $message) { + // Casing changed in PHP 8 from 'unable' to 'Unable' + if (\stripos($message, "stream_select(): unable to select [4]: ") === 0) { // EINTR + $this->streamSelectIgnoreResult = true; + + return; + } + + if (\str_contains($message, 'FD_SETSIZE')) { + $message = \str_replace(["\r\n", "\n", "\r"], " ", $message); + $pattern = '(stream_select\(\): You MUST recompile PHP with a larger value of FD_SETSIZE. It is set to (\d+), but you have descriptors numbered at least as high as (\d+)\.)'; + + if (\preg_match($pattern, $message, $match)) { + $helpLink = 'https://github.com/revoltphp/event-loop/tree/main/docs#implementations'; + + $message = 'You have reached the limits of stream_select(). It has a FD_SETSIZE of ' . $match[1] + . ', but you have file descriptors numbered at least as high as ' . $match[2] . '. ' + . "You can install one of the extensions listed on {$helpLink} to support a higher number of " + . "concurrent file descriptors. If a large number of open file descriptors is unexpected, you " + . "might be leaking file descriptors that aren't closed correctly."; + } + } + + throw new \Exception($message, $errno); + }; + } + + /** + * {@inheritdoc} + * + * @throws UnsupportedFeatureException If the pcntl extension is not available. + */ + public function onSignal(int $signo, callable $callback): string + { + if (!$this->signalHandling) { + throw new UnsupportedFeatureException("Signal handling requires the pcntl extension"); + } + + return parent::onSignal($signo, $callback); + } + + /** + * {@inheritdoc} + */ + public function getHandle(): mixed + { + return null; + } + + protected function now(): float + { + return (float) \hrtime(true) / 1_000_000_000; + } + + /** + * @throws \Throwable + */ + protected function dispatch(bool $blocking): void + { + $this->selectStreams( + $this->readStreams, + $this->writeStreams, + $blocking ? $this->getTimeout() : 0.0 + ); + + $now = $this->now(); + + while ($watcher = $this->timerQueue->extract($now)) { + if ($watcher->repeat) { + $watcher->enabled = false; // Trick base class into adding to enable queue when calling enable() + $this->enable($watcher->id); + } else { + $this->cancel($watcher->id); + } + + $this->invokeCallback($watcher); + } + + if ($this->signalHandling) { + \pcntl_signal_dispatch(); + } + } + + /** + * {@inheritdoc} + */ + protected function activate(array $watchers): void + { + foreach ($watchers as $watcher) { + if ($watcher instanceof StreamReadWatcher) { + \assert(\is_resource($watcher->stream)); + + $streamId = (int) $watcher->stream; + $this->readWatchers[$streamId][$watcher->id] = $watcher; + $this->readStreams[$streamId] = $watcher->stream; + } elseif ($watcher instanceof StreamWriteWatcher) { + \assert(\is_resource($watcher->stream)); + + $streamId = (int) $watcher->stream; + $this->writeWatchers[$streamId][$watcher->id] = $watcher; + $this->writeStreams[$streamId] = $watcher->stream; + } elseif ($watcher instanceof TimerWatcher) { + $this->timerQueue->insert($watcher); + } elseif ($watcher instanceof SignalWatcher) { + if (!isset($this->signalWatchers[$watcher->signal])) { + if (!@\pcntl_signal($watcher->signal, \Closure::fromCallable([$this, 'handleSignal']))) { + $message = "Failed to register signal handler"; + if ($error = \error_get_last()) { + $message .= \sprintf("; Errno: %d; %s", $error["type"], $error["message"]); + } + throw new \Error($message); + } + } + + $this->signalWatchers[$watcher->signal][$watcher->id] = $watcher; + } else { + // @codeCoverageIgnoreStart + throw new \Error("Unknown watcher type"); + // @codeCoverageIgnoreEnd + } + } + } + + /** + * {@inheritdoc} + */ + protected function deactivate(Watcher $watcher): void + { + if ($watcher instanceof StreamReadWatcher) { + $streamId = (int) $watcher->stream; + unset($this->readWatchers[$streamId][$watcher->id]); + if (empty($this->readWatchers[$streamId])) { + unset($this->readWatchers[$streamId], $this->readStreams[$streamId]); + } + } elseif ($watcher instanceof StreamWriteWatcher) { + $streamId = (int) $watcher->stream; + unset($this->writeWatchers[$streamId][$watcher->id]); + if (empty($this->writeWatchers[$streamId])) { + unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]); + } + } elseif ($watcher instanceof TimerWatcher) { + $this->timerQueue->remove($watcher); + } elseif ($watcher instanceof SignalWatcher) { + if (isset($this->signalWatchers[$watcher->signal])) { + unset($this->signalWatchers[$watcher->signal][$watcher->id]); + + if (empty($this->signalWatchers[$watcher->signal])) { + unset($this->signalWatchers[$watcher->signal]); + @\pcntl_signal($watcher->signal, \SIG_DFL); + } + } + } else { + // @codeCoverageIgnoreStart + throw new \Error("Unknown watcher type"); + // @codeCoverageIgnoreEnd + } + } + + /** + * @param resource[]|object[] $read + * @param resource[]|object[] $write + */ + private function selectStreams(array $read, array $write, float $timeout): void + { + if (!empty($read) || !empty($write)) { // Use stream_select() if there are any streams in the loop. + if ($timeout >= 0) { + $seconds = (int) $timeout; + $microseconds = (int) (($timeout - $seconds) * 1_000_000); + } else { + $seconds = null; + $microseconds = null; + } + + // Failed connection attempts are indicated via except on Windows + // @link https://github.com/reactphp/event-loop/blob/8bd064ce23c26c4decf186c2a5a818c9a8209eb0/src/StreamSelectLoop.php#L279-L287 + // @link https://docs.microsoft.com/de-de/windows/win32/api/winsock2/nf-winsock2-select + $except = null; + if (\DIRECTORY_SEPARATOR === '\\') { + $except = $write; + } + + \set_error_handler($this->streamSelectErrorHandler); + + try { + /** @psalm-suppress InvalidArgument */ + $result = \stream_select($read, $write, $except, $seconds, $microseconds); + } finally { + \restore_error_handler(); + } + + if ($this->streamSelectIgnoreResult || $result === 0) { + $this->streamSelectIgnoreResult = false; + return; + } + + if (!$result) { + $this->error(new \Exception('Unknown error during stream_select')); + return; + } + + foreach ($read as $stream) { + $streamId = (int) $stream; + if (!isset($this->readWatchers[$streamId])) { + continue; // All read watchers disabled. + } + + foreach ($this->readWatchers[$streamId] as $watcher) { + if (!isset($this->readWatchers[$streamId][$watcher->id])) { + continue; // Watcher disabled by another IO watcher. + } + + $this->invokeCallback($watcher); + } + } + + \assert(\is_array($write)); // See https://github.com/vimeo/psalm/issues/3036 + + if ($except) { + foreach ($except as $key => $socket) { + $write[$key] = $socket; + } + } + + foreach ($write as $stream) { + $streamId = (int) $stream; + if (!isset($this->writeWatchers[$streamId])) { + continue; // All write watchers disabled. + } + + foreach ($this->writeWatchers[$streamId] as $watcher) { + if (!isset($this->writeWatchers[$streamId][$watcher->id])) { + continue; // Watcher disabled by another IO watcher. + } + + $this->invokeCallback($watcher); + } + } + + return; + } + + if ($timeout < 0) { // Only signal watchers are enabled, so sleep indefinitely. + \usleep(\PHP_INT_MAX); + return; + } + + if ($timeout > 0) { // Sleep until next timer expires. + \usleep((int) ($timeout * 1_000_000)); + } + } + + /** + * @return float Seconds until next timer expires or -1 if there are no pending timers. + */ + private function getTimeout(): float + { + $expiration = $this->timerQueue->peek(); + + if ($expiration === null) { + return -1; + } + + $expiration -= $this->now(); + + return $expiration > 0 ? $expiration : 0.0; + } + + private function handleSignal(int $signo): void + { + foreach ($this->signalWatchers[$signo] as $watcher) { + if (!isset($this->signalWatchers[$signo][$watcher->id])) { + continue; + } + + $this->invokeCallback($watcher); + } + } +} diff --git a/src/EventLoop/Driver/TracingDriver.php b/src/EventLoop/Driver/TracingDriver.php new file mode 100644 index 0000000..1e53f06 --- /dev/null +++ b/src/EventLoop/Driver/TracingDriver.php @@ -0,0 +1,250 @@ +driver = $driver; + } + + public function run(): void + { + $this->driver->run(); + } + + public function stop(): void + { + $this->driver->stop(); + } + + public function isRunning(): bool + { + return $this->driver->isRunning(); + } + + public function defer(callable $callback): string + { + $id = $this->driver->defer(function (...$args) use ($callback) { + $this->cancel($args[0]); + return $callback(...$args); + }); + + $this->creationTraces[$id] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS)); + $this->enabledWatchers[$id] = true; + + return $id; + } + + public function delay(float $delay, callable $callback): string + { + $id = $this->driver->delay($delay, function (...$args) use ($callback) { + $this->cancel($args[0]); + return $callback(...$args); + }); + + $this->creationTraces[$id] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS)); + $this->enabledWatchers[$id] = true; + + return $id; + } + + public function repeat(float $interval, callable $callback): string + { + $id = $this->driver->repeat($interval, $callback); + + $this->creationTraces[$id] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS)); + $this->enabledWatchers[$id] = true; + + return $id; + } + + public function onReadable(mixed $stream, callable $callback): string + { + $id = $this->driver->onReadable($stream, $callback); + + $this->creationTraces[$id] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS)); + $this->enabledWatchers[$id] = true; + + return $id; + } + + public function onWritable(mixed $stream, callable $callback): string + { + $id = $this->driver->onWritable($stream, $callback); + + $this->creationTraces[$id] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS)); + $this->enabledWatchers[$id] = true; + + return $id; + } + + public function onSignal(int $signo, callable $callback): string + { + $id = $this->driver->onSignal($signo, $callback); + + $this->creationTraces[$id] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS)); + $this->enabledWatchers[$id] = true; + + return $id; + } + + public function enable(string $watcherId): string + { + try { + $this->driver->enable($watcherId); + $this->enabledWatchers[$watcherId] = true; + } catch (InvalidWatcherError $e) { + throw new InvalidWatcherError( + $watcherId, + $e->getMessage() . "\r\n\r\n" . $this->getTraces($watcherId) + ); + } + + return $watcherId; + } + + public function cancel(string $watcherId): void + { + $this->driver->cancel($watcherId); + + if (!isset($this->cancelTraces[$watcherId])) { + $this->cancelTraces[$watcherId] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS)); + } + + unset($this->enabledWatchers[$watcherId], $this->unreferencedWatchers[$watcherId]); + } + + public function disable(string $watcherId): string + { + $this->driver->disable($watcherId); + unset($this->enabledWatchers[$watcherId]); + + return $watcherId; + } + + public function reference(string $watcherId): string + { + try { + $this->driver->reference($watcherId); + unset($this->unreferencedWatchers[$watcherId]); + } catch (InvalidWatcherError $e) { + throw new InvalidWatcherError( + $watcherId, + $e->getMessage() . "\r\n\r\n" . $this->getTraces($watcherId) + ); + } + + return $watcherId; + } + + public function unreference(string $watcherId): string + { + $this->driver->unreference($watcherId); + $this->unreferencedWatchers[$watcherId] = true; + + return $watcherId; + } + + public function setErrorHandler(callable $callback = null): ?callable + { + return $this->driver->setErrorHandler($callback); + } + + /** @inheritdoc */ + public function getHandle(): mixed + { + return $this->driver->getHandle(); + } + + public function dump(): string + { + $dump = "Enabled, referenced watchers keeping the loop running: "; + + foreach ($this->enabledWatchers as $watcher => $_) { + if (isset($this->unreferencedWatchers[$watcher])) { + continue; + } + + $dump .= "Watcher ID: " . $watcher . "\r\n"; + $dump .= $this->getCreationTrace($watcher); + $dump .= "\r\n\r\n"; + } + + return \rtrim($dump); + } + + public function getInfo(): array + { + return $this->driver->getInfo(); + } + + public function __debugInfo(): array + { + return $this->driver->__debugInfo(); + } + + public function queue(callable $callback, mixed ...$args): void + { + $this->driver->queue($callback, ...$args); + } + + private function getTraces(string $watcherId): string + { + return "Creation Trace:\r\n" . $this->getCreationTrace($watcherId) . "\r\n\r\n" . + "Cancellation Trace:\r\n" . $this->getCancelTrace($watcherId); + } + + private function getCreationTrace(string $watcher): string + { + return $this->creationTraces[$watcher] ?? 'No creation trace, yet.'; + } + + private function getCancelTrace(string $watcher): string + { + return $this->cancelTraces[$watcher] ?? 'No cancellation trace, yet.'; + } + + /** + * Formats a stacktrace obtained via `debug_backtrace()`. + * + * @param array $trace Output of + * `debug_backtrace()`. + * + * @return string Formatted stacktrace. + */ + private function formatStacktrace(array $trace): string + { + return \implode("\n", \array_map(static function ($e, $i) { + $line = "#{$i} "; + + if (isset($e["file"])) { + $line .= "{$e['file']}:{$e['line']} "; + } + + if (isset($e["class"], $e["type"])) { + $line .= $e["class"] . $e["type"]; + } + + return $line . $e["function"] . "()"; + }, $trace, \array_keys($trace))); + } +} diff --git a/src/EventLoop/Driver/UvDriver.php b/src/EventLoop/Driver/UvDriver.php new file mode 100644 index 0000000..9400e7d --- /dev/null +++ b/src/EventLoop/Driver/UvDriver.php @@ -0,0 +1,269 @@ +handle = \uv_loop_new(); + + $this->ioCallback = function ($event, $status, $events, $resource): void { + $watchers = $this->watchers[(int) $event]; + + // Invoke the callback on errors, as this matches behavior with other loop back-ends. + // Re-enable watcher as libuv disables the watcher on non-zero status. + if ($status !== 0) { + $flags = 0; + foreach ($watchers as $watcher) { + \assert($watcher instanceof StreamWatcher); + + $flags |= $watcher->enabled ? $this->getStreamWatcherFlags($watcher) : 0; + } + \uv_poll_start($event, $flags, $this->ioCallback); + } + + foreach ($watchers as $watcher) { + \assert($watcher instanceof StreamWatcher); + + // $events is ORed with 4 to trigger watcher if no events are indicated (0) or on UV_DISCONNECT (4). + // http://docs.libuv.org/en/v1.x/poll.html + if (!($watcher->enabled && ($this->getStreamWatcherFlags($watcher) & $events || ($events | 4) === 4))) { + continue; + } + + $this->invokeCallback($watcher); + } + }; + + $this->timerCallback = function ($event): void { + $watcher = $this->watchers[(int) $event][0]; + + \assert($watcher instanceof TimerWatcher); + + if (!$watcher->repeat) { + unset($this->events[$watcher->id], $this->watchers[(int) $event]); // Avoid call to uv_is_active(). + $this->cancel($watcher->id); // Remove reference to watcher in parent. + } else { + // Disable and re-enable so it's not executed repeatedly in the same tick + // See https://github.com/amphp/amp/issues/131 + $this->disable($watcher->id); + $this->enable($watcher->id); + } + + $this->invokeCallback($watcher); + }; + + $this->signalCallback = function ($event, $signo): void { + $watcher = $this->watchers[(int) $event][0]; + + $this->invokeCallback($watcher); + }; + } + + /** + * {@inheritdoc} + */ + public function cancel(string $watcherId): void + { + parent::cancel($watcherId); + + if (!isset($this->events[$watcherId])) { + return; + } + + $event = $this->events[$watcherId]; + $eventId = (int) $event; + + if (isset($this->watchers[$eventId][0])) { // All except IO watchers. + unset($this->watchers[$eventId]); + } elseif (isset($this->watchers[$eventId][$watcherId])) { + $watcher = $this->watchers[$eventId][$watcherId]; + unset($this->watchers[$eventId][$watcherId]); + + \assert($watcher instanceof StreamWatcher); + + if (empty($this->watchers[$eventId])) { + unset($this->watchers[$eventId], $this->streams[(int) $watcher->stream]); + } + } + + unset($this->events[$watcherId]); + } + + /** + * {@inheritdoc} + */ + public function getHandle(): mixed + { + return $this->handle; + } + + protected function now(): float + { + \uv_update_time($this->handle); + + /** @psalm-suppress TooManyArguments */ + return \uv_now($this->handle) / 1000; + } + + /** + * {@inheritdoc} + */ + protected function dispatch(bool $blocking): void + { + /** @psalm-suppress TooManyArguments */ + \uv_run($this->handle, $blocking ? \UV::RUN_ONCE : \UV::RUN_NOWAIT); + } + + /** + * {@inheritdoc} + */ + protected function activate(array $watchers): void + { + $now = $this->now(); + + foreach ($watchers as $watcher) { + $id = $watcher->id; + + if ($watcher instanceof StreamWatcher) { + \assert(\is_resource($watcher->stream)); + + $streamId = (int) $watcher->stream; + + if (isset($this->streams[$streamId])) { + $event = $this->streams[$streamId]; + } elseif (isset($this->events[$id])) { + $event = $this->streams[$streamId] = $this->events[$id]; + } else { + /** @psalm-suppress TooManyArguments */ + $event = $this->streams[$streamId] = \uv_poll_init_socket($this->handle, $watcher->stream); + } + + $eventId = (int) $event; + $this->events[$id] = $event; + $this->watchers[$eventId][$id] = $watcher; + + $flags = 0; + foreach ($this->watchers[$eventId] as $w) { + \assert($w instanceof StreamWatcher); + + $flags |= $w->enabled ? ($this->getStreamWatcherFlags($w)) : 0; + } + \uv_poll_start($event, $flags, $this->ioCallback); + } elseif ($watcher instanceof TimerWatcher) { + if (isset($this->events[$id])) { + $event = $this->events[$id]; + } else { + $event = $this->events[$id] = \uv_timer_init($this->handle); + } + + $this->watchers[(int) $event] = [$watcher]; + + \uv_timer_start( + $event, + (int) \ceil(\max(0, $watcher->expiration - $now) * 1000), + $watcher->repeat ? (int) \ceil($watcher->interval * 1000) : 0, + $this->timerCallback + ); + } elseif ($watcher instanceof SignalWatcher) { + if (isset($this->events[$id])) { + $event = $this->events[$id]; + } else { + /** @psalm-suppress TooManyArguments */ + $event = $this->events[$id] = \uv_signal_init($this->handle); + } + + $this->watchers[(int) $event] = [$watcher]; + + /** @psalm-suppress TooManyArguments */ + \uv_signal_start($event, $this->signalCallback, $watcher->signal); + } else { + // @codeCoverageIgnoreStart + throw new \Error("Unknown watcher type"); + // @codeCoverageIgnoreEnd + } + } + } + + /** + * {@inheritdoc} + */ + protected function deactivate(Watcher $watcher): void + { + $id = $watcher->id; + + if (!isset($this->events[$id])) { + return; + } + + $event = $this->events[$id]; + + if (!\uv_is_active($event)) { + return; + } + + if ($watcher instanceof StreamWatcher) { + $flags = 0; + foreach ($this->watchers[(int) $event] as $w) { + \assert($w instanceof StreamWatcher); + + $flags |= $w->enabled ? ($this->getStreamWatcherFlags($w)) : 0; + } + + if ($flags) { + \uv_poll_start($event, $flags, $this->ioCallback); + } else { + \uv_poll_stop($event); + } + } elseif ($watcher instanceof TimerWatcher) { + \uv_timer_stop($event); + } elseif ($watcher instanceof SignalWatcher) { + \uv_signal_stop($event); + } else { + // @codeCoverageIgnoreStart + throw new \Error("Unknown watcher type"); + // @codeCoverageIgnoreEnd + } + } + + private function getStreamWatcherFlags(StreamWatcher $watcher): int + { + if ($watcher instanceof StreamWriteWatcher) { + return \UV::WRITABLE; + } + + if ($watcher instanceof StreamReadWatcher) { + return \UV::READABLE; + } + + throw new \Error('Invalid watcher type'); + } +} diff --git a/src/EventLoop/DriverFactory.php b/src/EventLoop/DriverFactory.php new file mode 100644 index 0000000..130269f --- /dev/null +++ b/src/EventLoop/DriverFactory.php @@ -0,0 +1,79 @@ +createDriverFromEnv()) { + return $driver; + } + + if (UvDriver::isSupported()) { + return new UvDriver(); + } + + if (EvDriver::isSupported()) { + return new EvDriver(); + } + + if (EventDriver::isSupported()) { + return new EventDriver(); + } + + return new StreamSelectDriver(); + })(); + + if (\getenv("REVOLT_DEBUG_TRACE_WATCHERS")) { + return new TracingDriver($driver); + } + + return $driver; + } + + /** + * @return Driver|null + */ + private function createDriverFromEnv(): ?Driver + { + $driver = \getenv("REVOLT_LOOP_DRIVER"); + + if (!$driver) { + return null; + } + + if (!\class_exists($driver)) { + throw new \Error(\sprintf( + "Driver '%s' does not exist.", + $driver + )); + } + + if (!\is_subclass_of($driver, Driver::class)) { + throw new \Error(\sprintf( + "Driver '%s' is not a subclass of '%s'.", + $driver, + Driver::class + )); + } + + return new $driver(); + } +} +// @codeCoverageIgnoreEnd diff --git a/src/EventLoop/Internal/AbstractDriver.php b/src/EventLoop/Internal/AbstractDriver.php new file mode 100644 index 0000000..7d12735 --- /dev/null +++ b/src/EventLoop/Internal/AbstractDriver.php @@ -0,0 +1,693 @@ +>> */ + private array $microQueue = []; + + /** @var callable(\Throwable):void|null */ + private $errorHandler; + + private bool $running = false; + + public function __construct() + { + $this->fiber = $this->createFiber(); + } + + /** + * Run the event loop. + * + * One iteration of the loop is called one "tick". A tick covers the following steps: + * + * 1. Activate watchers created / enabled in the last tick / before `run()`. + * 2. Execute all enabled defer watchers. + * 3. Execute all due timer, pending signal and actionable stream callbacks, each only once per tick. + * + * The loop MUST continue to run until it is either stopped explicitly, no referenced watchers exist anymore, or an + * exception is thrown that cannot be handled. Exceptions that cannot be handled are exceptions thrown from an + * error handler or exceptions that would be passed to an error handler but none exists to handle them. + * + * @throw \Error Thrown if the event loop is already running. + */ + public function run(): void + { + if ($this->running) { + throw new \Error("The loop was already running"); + } + + $this->running = true; + + try { + while ($this->running) { + $this->invokeMicrotasks(); + + if ($this->isEmpty()) { + return; + } + + $this->tick(); + } + } finally { + $this->stop(); + } + } + + /** + * Stop the event loop. + * + * When an event loop is stopped, it continues with its current tick and exits the loop afterwards. Multiple calls + * to stop MUST be ignored and MUST NOT raise an exception. + */ + public function stop(): void + { + $this->running = false; + } + + /** + * @return bool True if the event loop is running, false if it is stopped. + */ + public function isRunning(): bool + { + return $this->running; + } + + /** + * Queue a microtask. + * + * The queued callable MUST be executed immediately once the event loop gains control. Order of queueing MUST be + * preserved when executing the callbacks. Recursive scheduling can thus result in infinite loops, use with care. + * + * Does NOT create a watcher, thus CAN NOT be marked as disabled or unreferenced. Use {@see EventLoop::defer()} if + * you need these features. + * + * @param callable $callback The callback to queue. + * @param mixed ...$args The callback arguments. + */ + public function queue(callable $callback, mixed ...$args): void + { + $this->microQueue[] = [$callback, $args]; + } + + /** + * Defer the execution of a callback. + * + * The deferred callable MUST be executed before any other type of watcher in a tick. Order of enabling MUST be + * preserved when executing the callbacks. + * + * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) + * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param callable(string):void $callback The callback to defer. The `$watcherId` will be + * invalidated before the callback call. + * + * @return string An unique identifier that can be used to cancel, enable or disable the watcher. + */ + public function defer(callable $callback): string + { + $watcher = new DeferWatcher($this->nextId++, $callback); + + $this->watchers[$watcher->id] = $watcher; + $this->nextTickQueue[$watcher->id] = $watcher; + + return $watcher->id; + } + + /** + * Delay the execution of a callback. + * + * The delay is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be determined by which + * timers expire first, but timers with the same expiration time MAY be executed in any order. + * + * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) + * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param float $delay The amount of time, in seconds, to delay the execution for. + * @param callable(string):void $callback The callback to delay. The `$watcherId` will be + * invalidated before the callback invocation. + * + * @return string An unique identifier that can be used to cancel, enable or disable the watcher. + */ + public function delay(float $delay, callable $callback): string + { + if ($delay < 0) { + throw new \Error("Delay must be greater than or equal to zero"); + } + + $watcher = new TimerWatcher($this->nextId++, $delay, $callback, $this->now() + $delay); + + $this->watchers[$watcher->id] = $watcher; + $this->enableQueue[$watcher->id] = $watcher; + + return $watcher->id; + } + + /** + * Repeatedly execute a callback. + * + * The interval between executions is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be + * determined by which timers expire first, but timers with the same expiration time MAY be executed in any order. + * The first execution is scheduled after the first interval period. + * + * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) + * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param float $interval The time interval, in seconds, to wait between executions. + * @param callable(string):void $callback The callback to repeat. + * + * @return string An unique identifier that can be used to cancel, enable or disable the watcher. + */ + public function repeat(float $interval, callable $callback): string + { + if ($interval < 0) { + throw new \Error("Interval must be greater than or equal to zero"); + } + + $watcher = new TimerWatcher($this->nextId++, $interval, $callback, $this->now() + $interval, true); + + $this->watchers[$watcher->id] = $watcher; + $this->enableQueue[$watcher->id] = $watcher; + + return $watcher->id; + } + + /** + * Execute a callback when a stream resource becomes readable or is closed for reading. + * + * Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the + * watcher when closing the resource locally. Drivers MAY choose to notify the user if there are watchers on invalid + * resources, but are not required to, due to the high performance impact. Watchers on closed resources are + * therefore undefined behavior. + * + * Multiple watchers on the same stream MAY be executed in any order. + * + * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) + * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param resource|object $stream The stream to monitor. + * @param callable(string, resource):void $callback The callback to execute. + * + * @return string An unique identifier that can be used to cancel, enable or disable the watcher. + */ + public function onReadable(mixed $stream, callable $callback): string + { + $watcher = new StreamReadWatcher($this->nextId++, $callback, $stream); + + $this->watchers[$watcher->id] = $watcher; + $this->enableQueue[$watcher->id] = $watcher; + + return $watcher->id; + } + + /** + * Execute a callback when a stream resource becomes writable or is closed for writing. + * + * Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the + * watcher when closing the resource locally. Drivers MAY choose to notify the user if there are watchers on invalid + * resources, but are not required to, due to the high performance impact. Watchers on closed resources are + * therefore undefined behavior. + * + * Multiple watchers on the same stream MAY be executed in any order. + * + * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) + * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param resource|object $stream The stream to monitor. + * @param callable(string, resource|object):void $callback The callback to execute. + * + * @return string An unique identifier that can be used to cancel, enable or disable the watcher. + */ + public function onWritable($stream, callable $callback): string + { + $watcher = new StreamWriteWatcher($this->nextId++, $callback, $stream); + + $this->watchers[$watcher->id] = $watcher; + $this->enableQueue[$watcher->id] = $watcher; + + return $watcher->id; + } + + /** + * Execute a callback when a signal is received. + * + * Warning: Installing the same signal on different instances of this interface is deemed undefined behavior. + * Implementations MAY try to detect this, if possible, but are not required to. This is due to technical + * limitations of the signals being registered globally per process. + * + * Multiple watchers on the same signal MAY be executed in any order. + * + * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) + * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param int $signo The signal number to monitor. + * @param callable(string, int):void $callback The callback to execute. + * + * @return string An unique identifier that can be used to cancel, enable or disable the watcher. + * + * @throws UnsupportedFeatureException If signal handling is not supported. + */ + public function onSignal(int $signo, callable $callback): string + { + $watcher = new SignalWatcher($this->nextId++, $callback, $signo); + + $this->watchers[$watcher->id] = $watcher; + $this->enableQueue[$watcher->id] = $watcher; + + return $watcher->id; + } + + /** + * Enable a watcher to be active starting in the next tick. + * + * Watchers MUST immediately be marked as enabled, but only be activated (i.e. callbacks can be called) right before + * the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. + * + * @param string $watcherId The watcher identifier. + * + * @return string The watcher identifier. + * + * @throws InvalidWatcherError If the watcher identifier is invalid. + */ + public function enable(string $watcherId): string + { + if (!isset($this->watchers[$watcherId])) { + throw new InvalidWatcherError($watcherId, "Cannot enable an invalid watcher identifier: '{$watcherId}'"); + } + + $watcher = $this->watchers[$watcherId]; + + if ($watcher->enabled) { + return $watcherId; // Watcher already enabled. + } + + $watcher->enabled = true; + + if ($watcher instanceof DeferWatcher) { + $this->nextTickQueue[$watcher->id] = $watcher; + } elseif ($watcher instanceof TimerWatcher) { + $watcher->expiration = $this->now() + $watcher->interval; + $this->enableQueue[$watcher->id] = $watcher; + } else { + $this->enableQueue[$watcher->id] = $watcher; + } + + return $watcherId; + } + + /** + * Cancel a watcher. + * + * This will detach the event loop from all resources that are associated to the watcher. After this operation the + * watcher is permanently invalid. Calling this function MUST NOT fail, even if passed an invalid watcher. + * + * @param string $watcherId The watcher identifier. + */ + public function cancel(string $watcherId): void + { + $this->disable($watcherId); + unset($this->watchers[$watcherId]); + } + + /** + * Disable a watcher immediately. + * + * A watcher MUST be disabled immediately, e.g. if a defer watcher disables a later defer watcher, the second defer + * watcher isn't executed in this tick. + * + * Disabling a watcher MUST NOT invalidate the watcher. Calling this function MUST NOT fail, even if passed an + * invalid watcher. + * + * @param string $watcherId The watcher identifier. + * + * @return string The watcher identifier. + */ + public function disable(string $watcherId): string + { + if (!isset($this->watchers[$watcherId])) { + return $watcherId; + } + + $watcher = $this->watchers[$watcherId]; + + if (!$watcher->enabled) { + return $watcherId; // Watcher already disabled. + } + + $watcher->enabled = false; + $id = $watcher->id; + + if ($watcher instanceof DeferWatcher) { + if (isset($this->nextTickQueue[$id])) { + // Watcher was only queued to be enabled. + unset($this->nextTickQueue[$id]); + } else { + unset($this->deferQueue[$id]); + } + } else { + if (isset($this->enableQueue[$id])) { + // Watcher was only queued to be enabled. + unset($this->enableQueue[$id]); + } else { + $this->deactivate($watcher); + } + } + + return $watcherId; + } + + /** + * Reference a watcher. + * + * This will keep the event loop alive whilst the watcher is still being monitored. Watchers have this state by + * default. + * + * @param string $watcherId The watcher identifier. + * + * @return string The watcher identifier. + * + * @throws InvalidWatcherError If the watcher identifier is invalid. + */ + public function reference(string $watcherId): string + { + if (!isset($this->watchers[$watcherId])) { + throw new InvalidWatcherError($watcherId, "Cannot reference an invalid watcher identifier: '{$watcherId}'"); + } + + $this->watchers[$watcherId]->referenced = true; + + return $watcherId; + } + + /** + * Unreference a watcher. + * + * The event loop should exit the run method when only unreferenced watchers are still being monitored. Watchers + * are all referenced by default. + * + * @param string $watcherId The watcher identifier. + * + * @return string The watcher identifier. + */ + public function unreference(string $watcherId): string + { + if (!isset($this->watchers[$watcherId])) { + return $watcherId; + } + + $this->watchers[$watcherId]->referenced = false; + + return $watcherId; + } + + /** + * Set a callback to be executed when an error occurs. + * + * The callback receives the error as the first and only parameter. The return value of the callback gets ignored. + * If it can't handle the error, it MUST throw the error. Errors thrown by the callback or during its invocation + * MUST be thrown into the `run` loop and stop the driver. + * + * Subsequent calls to this method will overwrite the previous handler. + * + * @param ?(callable(\Throwable):void) $callback The callback to execute. `null` will clear the + * current handler. + * + * @return ?(callable(\Throwable):void) The previous handler, `null` if there was none. + */ + public function setErrorHandler(callable $callback = null): ?callable + { + $previous = $this->errorHandler; + $this->errorHandler = $callback; + return $previous; + } + + /** + * Returns the same array of data as getInfo(). + * + * @return array + */ + public function __debugInfo(): array + { + // @codeCoverageIgnoreStart + return $this->getInfo(); + // @codeCoverageIgnoreEnd + } + + /** + * Retrieve an associative array of information about the event loop driver. + * + * The returned array MUST contain the following data describing the driver's currently registered watchers: + * + * [ + * "defer" => ["enabled" => int, "disabled" => int], + * "delay" => ["enabled" => int, "disabled" => int], + * "repeat" => ["enabled" => int, "disabled" => int], + * "on_readable" => ["enabled" => int, "disabled" => int], + * "on_writable" => ["enabled" => int, "disabled" => int], + * "on_signal" => ["enabled" => int, "disabled" => int], + * "enabled_watchers" => ["referenced" => int, "unreferenced" => int], + * ]; + * + * Implementations MAY optionally add more information in the array but at minimum the above `key => value` format + * MUST always be provided. + * + * @return array Statistics about the loop in the described format. + */ + public function getInfo(): array + { + $watchers = [ + "referenced" => 0, + "unreferenced" => 0, + ]; + + $defer = $delay = $repeat = $onReadable = $onWritable = $onSignal = [ + "enabled" => 0, + "disabled" => 0, + ]; + + foreach ($this->watchers as $watcher) { + if ($watcher instanceof StreamReadWatcher) { + $array = &$onReadable; + } elseif ($watcher instanceof StreamWriteWatcher) { + $array = &$onWritable; + } elseif ($watcher instanceof SignalWatcher) { + $array = &$onSignal; + } elseif ($watcher instanceof TimerWatcher) { + if ($watcher->repeat) { + $array = &$repeat; + } else { + $array = &$delay; + } + } elseif ($watcher instanceof DeferWatcher) { + $array = &$defer; + } else { + // @codeCoverageIgnoreStart + throw new \Error("Unknown watcher type"); + // @codeCoverageIgnoreEnd + } + + if ($watcher->enabled) { + ++$array["enabled"]; + + if ($watcher->referenced) { + ++$watchers["referenced"]; + } else { + ++$watchers["unreferenced"]; + } + } else { + ++$array["disabled"]; + } + } + + return [ + "enabled_watchers" => $watchers, + "defer" => $defer, + "delay" => $delay, + "repeat" => $repeat, + "on_readable" => $onReadable, + "on_writable" => $onWritable, + "on_signal" => $onSignal, + ]; + } + + /** + * Activates (enables) all the given watchers. + * + * @param Watcher[] $watchers + */ + abstract protected function activate(array $watchers): void; + + /** + * Dispatches any pending read/write, timer, and signal events. + */ + abstract protected function dispatch(bool $blocking): void; + + /** + * Deactivates (disables) the given watcher. + */ + abstract protected function deactivate(Watcher $watcher): void; + + protected function invokeCallback(Watcher $watcher): void + { + if ($this->fiber->isRunning()) { + $this->fiber = $this->createFiber(); + } + + try { + $yielded = $this->fiber->resume($watcher); + + if ($yielded !== $watcher) { + // Watcher callback suspended. + $this->fiber = $this->createFiber(); + } + } catch (\Throwable $exception) { + $this->fiber = $this->createFiber(); + $this->error($exception); + } + + if ($this->microQueue) { + $this->invokeMicrotasks(); + } + } + + /** + * Invokes the error handler with the given exception. + * + * @param \Throwable $exception The exception thrown from a watcher callback. + * + * @throws \Throwable If no error handler has been set. + */ + protected function error(\Throwable $exception): void + { + if ($this->errorHandler === null) { + throw $exception; + } + + ($this->errorHandler)($exception); + } + + /** + * Returns the current event loop time in second increments. + * + * Note this value does not necessarily correlate to wall-clock time, rather the value returned is meant to be used + * in relative comparisons to prior values returned by this method (intervals, expiration calculations, etc.). + */ + abstract protected function now(): float; + + /** + * @return bool True if no enabled and referenced watchers remain in the loop. + */ + private function isEmpty(): bool + { + foreach ($this->watchers as $watcher) { + if ($watcher->enabled && $watcher->referenced) { + return false; + } + } + + return true; + } + + /** + * Executes a single tick of the event loop. + */ + private function tick(): void + { + if (empty($this->deferQueue)) { + $this->deferQueue = $this->nextTickQueue; + } else { + $this->deferQueue = \array_merge($this->deferQueue, $this->nextTickQueue); + } + $this->nextTickQueue = []; + + $this->activate($this->enableQueue); + $this->enableQueue = []; + + foreach ($this->deferQueue as $watcher) { + if (!isset($this->deferQueue[$watcher->id])) { + continue; // Watcher disabled by another defer watcher. + } + + unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]); + + $this->invokeCallback($watcher); + } + + /** @psalm-suppress RedundantCondition */ + $this->dispatch( + empty($this->nextTickQueue) + && empty($this->enableQueue) + && $this->running + && !$this->isEmpty() + ); + } + + private function invokeMicrotasks(): void + { + while ($this->microQueue) { + foreach ($this->microQueue as $id => [$callable, $args]) { + try { + unset($this->microQueue[$id]); + $callable(...$args); + } catch (\Throwable $exception) { + $this->error($exception); + } + } + } + } + + private function createFiber(): \Fiber + { + $fiber = new \Fiber(static function (): void { + $watcher = null; + while ($watcher = \Fiber::suspend($watcher)) { + $result = match (true) { + $watcher instanceof StreamWatcher => ($watcher->callback)($watcher->id, $watcher->stream), + $watcher instanceof SignalWatcher => ($watcher->callback)($watcher->id, $watcher->signal), + default => ($watcher->callback)($watcher->id), + }; + + if ($result !== null) { + throw InvalidCallbackError::noVoid($watcher->id, $watcher->callback); + } + } + }); + + $fiber->start(); + return $fiber; + } +} diff --git a/src/EventLoop/Internal/DeferWatcher.php b/src/EventLoop/Internal/DeferWatcher.php new file mode 100644 index 0000000..2abca4e --- /dev/null +++ b/src/EventLoop/Internal/DeferWatcher.php @@ -0,0 +1,8 @@ +pointers[$watcher->id])); + + $node = \count($this->watchers); + $this->watchers[$node] = $watcher; + $this->pointers[$watcher->id] = $node; + + $this->heapifyUp($node); + } + + /** + * Removes the given watcher from the queue. + * + * Time complexity: O(log(n)). + */ + public function remove(TimerWatcher $watcher): void + { + $id = $watcher->id; + + if (!isset($this->pointers[$id])) { + return; + } + + $this->removeAndRebuild($this->pointers[$id]); + } + + /** + * Deletes and returns the watcher on top of the heap if it has expired, otherwise null is returned. + * + * Time complexity: O(log(n)). + * + * @param float $now Current loop time. + * + * @return TimerWatcher|null Expired watcher at the top of the heap or null if the watcher has not expired. + */ + public function extract(float $now): ?TimerWatcher + { + if (!$this->watchers) { + return null; + } + + $watcher = $this->watchers[0]; + if ($watcher->expiration > $now) { + return null; + } + + $this->removeAndRebuild(0); + + return $watcher; + } + + /** + * Returns the expiration time value at the top of the heap. + * + * Time complexity: O(1). + * + * @return float|null Expiration time of the watcher at the top of the heap or null if the heap is empty. + */ + public function peek(): ?float + { + return isset($this->watchers[0]) ? $this->watchers[0]->expiration : null; + } + + /** + * @param int $node Rebuild the data array from the given node upward. + */ + private function heapifyUp(int $node): void + { + $entry = $this->watchers[$node]; + while ($node !== 0 && $entry->expiration < $this->watchers[$parent = ($node - 1) >> 1]->expiration) { + $this->swap($node, $parent); + $node = $parent; + } + } + + /** + * @param int $node Rebuild the data array from the given node downward. + */ + private function heapifyDown(int $node): void + { + $length = \count($this->watchers); + while (($child = ($node << 1) + 1) < $length) { + if ($this->watchers[$child]->expiration < $this->watchers[$node]->expiration + && ($child + 1 >= $length || $this->watchers[$child]->expiration < $this->watchers[$child + 1]->expiration) + ) { + // Left child is less than parent and right child. + $swap = $child; + } elseif ($child + 1 < $length && $this->watchers[$child + 1]->expiration < $this->watchers[$node]->expiration) { + // Right child is less than parent and left child. + $swap = $child + 1; + } else { // Left and right child are greater than parent. + break; + } + + $this->swap($node, $swap); + $node = $swap; + } + } + + private function swap(int $left, int $right): void + { + $temp = $this->watchers[$left]; + + $this->watchers[$left] = $this->watchers[$right]; + $this->pointers[$this->watchers[$right]->id] = $left; + + $this->watchers[$right] = $temp; + $this->pointers[$temp->id] = $right; + } + + /** + * @param int $node Remove the given node and then rebuild the data array. + */ + private function removeAndRebuild(int $node): void + { + $length = \count($this->watchers) - 1; + $id = $this->watchers[$node]->id; + $left = $this->watchers[$node] = $this->watchers[$length]; + $this->pointers[$left->id] = $node; + unset($this->watchers[$length], $this->pointers[$id]); + + if ($node < $length) { // don't need to do anything if we removed the last element + $parent = ($node - 1) >> 1; + if ($parent >= 0 && $this->watchers[$node]->expiration < $this->watchers[$parent]->expiration) { + $this->heapifyUp($node); + } else { + $this->heapifyDown($node); + } + } + } +} diff --git a/src/EventLoop/Internal/TimerWatcher.php b/src/EventLoop/Internal/TimerWatcher.php new file mode 100644 index 0000000..42309d3 --- /dev/null +++ b/src/EventLoop/Internal/TimerWatcher.php @@ -0,0 +1,17 @@ +callback = $callback; + } + + /** + * @param string $property + * + * @psalm-return no-return + */ + public function __get(string $property): void + { + throw new \Error("Unknown property '${property}'"); + } + + /** + * @param string $property + * @param mixed $value + * + * @psalm-return no-return + */ + public function __set(string $property, mixed $value): void + { + throw new \Error("Unknown property '${property}'"); + } +} diff --git a/src/EventLoop/Internal/functions.php b/src/EventLoop/Internal/functions.php new file mode 100644 index 0000000..33f358c --- /dev/null +++ b/src/EventLoop/Internal/functions.php @@ -0,0 +1,21 @@ + throw $exception); + } +} diff --git a/src/EventLoop/InvalidCallbackError.php b/src/EventLoop/InvalidCallbackError.php new file mode 100644 index 0000000..3de59a6 --- /dev/null +++ b/src/EventLoop/InvalidCallbackError.php @@ -0,0 +1,55 @@ +getFileName() && $reflection->getStartLine()) { + $errorDetail = " defined in " . $reflection->getFileName() . ':' . $reflection->getStartLine(); + } + } catch (\ReflectionException $e) { + // ignore + } + } + + return new self($watcherId, 'Non-null return value received from callback' . $errorDetail); + } + + /** @var string */ + private string $watcherId; + + /** + * @param string $watcherId The watcher identifier. + * @param string $message The exception message. + */ + private function __construct(string $watcherId, string $message) + { + $this->watcherId = $watcherId; + parent::__construct($message); + } + + /** + * @return string The watcher identifier. + */ + public function getWatcherId(): string + { + return $this->watcherId; + } +} diff --git a/src/EventLoop/InvalidWatcherError.php b/src/EventLoop/InvalidWatcherError.php new file mode 100644 index 0000000..f77d17c --- /dev/null +++ b/src/EventLoop/InvalidWatcherError.php @@ -0,0 +1,32 @@ +watcherId = $watcherId; + parent::__construct($message); + } + + /** + * @return string The watcher identifier. + */ + public function getWatcherId(): string + { + return $this->watcherId; + } +} diff --git a/src/EventLoop/Suspension.php b/src/EventLoop/Suspension.php new file mode 100644 index 0000000..d2e4b27 --- /dev/null +++ b/src/EventLoop/Suspension.php @@ -0,0 +1,111 @@ +then(fn ($value) => $suspension->resume($value), fn ($throwable) => $suspension->throw($throwable)); + * + * $suspension->suspend(); + * ``` + */ +final class Suspension +{ + private ?\Fiber $fiber; + private \Fiber $scheduler; + private Driver $driver; + private bool $pending = false; + + /** + * Suspension constructor. + * + * @param Driver $driver + * @param \Fiber $scheduler + * + * @internal + */ + public function __construct(Driver $driver, \Fiber $scheduler) + { + $this->driver = $driver; + $this->fiber = \Fiber::getCurrent(); + + if ($this->fiber === $scheduler) { + throw new \Error(\sprintf( + 'Cannot call %s() within a scheduler microtask (%s::queue() callback)', + __METHOD__, + EventLoop::class, + )); + } + + $this->scheduler = $scheduler; + } + + public function throw(\Throwable $throwable): void + { + if (!$this->pending) { + throw new \Error('Must call throw() before calling resume()'); + } + + $this->pending = false; + + if ($this->fiber) { + $this->driver->queue([$this->fiber, 'throw'], $throwable); + } else { + // Suspend event loop fiber to {main}. + $this->driver->queue([\Fiber::class, 'suspend'], static fn () => throw $throwable); + } + } + + public function resume(mixed $value): void + { + if (!$this->pending) { + throw new \Error('Must call suspend() before calling resume()'); + } + + $this->pending = false; + + if ($this->fiber) { + $this->driver->queue([$this->fiber, 'resume'], $value); + } else { + // Suspend event loop fiber to {main}. + $this->driver->queue([\Fiber::class, 'suspend'], static fn () => $value); + } + } + + public function suspend(): mixed + { + if ($this->pending) { + throw new \Error('Must call resume() or throw() before calling suspend() again'); + } + + if ($this->fiber !== \Fiber::getCurrent()) { + throw new \Error('Must not call suspend() from another fiber'); + } + + $this->pending = true; + + // Awaiting from within a fiber. + if ($this->fiber) { + return \Fiber::suspend(); + } + + // Awaiting from {main}. + $lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start(); + + /** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */ + if ($this->pending) { + // Should only be true if the event loop exited without resolving the promise. + throw new \Error('Scheduler suspended or exited unexpectedly'); + } + + return $lambda(); + } +} diff --git a/src/EventLoop/UnsupportedFeatureException.php b/src/EventLoop/UnsupportedFeatureException.php new file mode 100644 index 0000000..5cf0449 --- /dev/null +++ b/src/EventLoop/UnsupportedFeatureException.php @@ -0,0 +1,12 @@ +loop = ($this->getFactory())(); + \gc_collect_cycles(); + } + + public function tearDown(): void + { + unset($this->loop); + } + + public function testCorrectTimeoutIfBlockingBeforeActivate(): void + { + $start = 0; + $invoked = 0; + + $this->start(function (Driver $loop) use (&$start, &$invoked): void { + $loop->defer(function () use ($loop, &$start, &$invoked) { + $start = now(); + + $loop->delay(1, function () use (&$invoked) { + $invoked = now(); + }); + + \usleep(500000); + }); + }); + + self::assertNotSame(0, $start); + self::assertNotSame(0, $invoked); + + self::assertGreaterThanOrEqual(1, $invoked - $start); + self::assertLessThan(1.1, $invoked - $start); + } + + public function testCorrectTimeoutIfBlockingBeforeDelay(): void + { + $start = 0; + $invoked = 0; + + $this->start(function (Driver $loop) use (&$start, &$invoked): void { + $start = now(); + + \usleep(500000); + + $loop->delay(1, function () use (&$invoked) { + $invoked = now(); + }); + }); + + self::assertNotSame(0, $start); + self::assertNotSame(0, $invoked); + + self::assertGreaterThanOrEqual(1.5, $invoked - $start); + self::assertLessThan(1.6, $invoked - $start); + } + + public function testLoopTerminatesWithOnlyUnreferencedWatchers(): void + { + $this->start(function (Driver $loop) use (&$end): void { + $loop->unreference($loop->onReadable(STDIN, static function (): void { + })); + $w = $loop->delay(10, static function (): void { + }); + $loop->defer(function () use ($loop, $w): void { + $loop->cancel($w); + }); + $end = true; + }); + self::assertTrue($end); + } + + /** This MUST NOT have a "test" prefix, otherwise it's executed as test and marked as risky. */ + public function checkForSignalCapability(): void + { + if (!\extension_loaded('posix')) { + self::markTestSkipped("ext-posix is required for sending test signals. Skipping."); + } + + try { + $watcher = $this->loop->onSignal(SIGUSR1, function (): void { + }); + $this->loop->cancel($watcher); + } catch (UnsupportedFeatureException $e) { + self::markTestSkipped("The loop is not capable of handling signals properly. Skipping."); + } + } + + public function testWatcherUnrefRerefRunResult(): void + { + $invoked = false; + $this->start(function (Driver $loop) use (&$invoked): void { + $watcher = $loop->defer(function () use (&$invoked): void { + $invoked = true; + }); + $loop->unreference($watcher); + $loop->unreference($watcher); + $loop->reference($watcher); + }); + self::assertTrue($invoked); + } + + public function testDeferWatcherUnrefRunResult(): void + { + $invoked = false; + $this->start(function (Driver $loop) use (&$invoked): void { + $watcher = $loop->defer(function () use (&$invoked): void { + $invoked = true; + }); + $loop->unreference($watcher); + }); + self::assertFalse($invoked); + } + + public function testOnceWatcherUnrefRunResult(): void + { + $invoked = false; + $this->start(function (Driver $loop) use (&$invoked): void { + $watcher = $loop->delay(2, function () use (&$invoked): void { + $invoked = true; + }); + $loop->unreference($watcher); + $loop->unreference($watcher); + }); + self::assertFalse($invoked); + } + + public function testRepeatWatcherUnrefRunResult(): void + { + $invoked = false; + $this->start(function (Driver $loop) use (&$invoked): void { + $watcher = $loop->repeat(2, function () use (&$invoked): void { + $invoked = true; + }); + $loop->unreference($watcher); + }); + self::assertFalse($invoked); + } + + public function testOnReadableWatcherUnrefRunResult(): void + { + $invoked = false; + $this->start(function (Driver $loop) use (&$invoked): void { + $watcher = $loop->onReadable(STDIN, function () use (&$invoked): void { + $invoked = true; + }); + $loop->unreference($watcher); + }); + self::assertFalse($invoked); + } + + public function testOnWritableWatcherKeepAliveRunResult(): void + { + $invoked = false; + $this->start(function (Driver $loop) use (&$invoked): void { + $watcher = $loop->onWritable(STDOUT, function () use (&$invoked): void { + $invoked = true; + }); + $loop->unreference($watcher); + }); + self::assertFalse($invoked); + } + + public function testOnSignalWatcherKeepAliveRunResult(): void + { + $this->checkForSignalCapability(); + $invoked = false; + $this->start(function (Driver $loop) use (&$invoked): void { + $watcher = $loop->onSignal(SIGUSR1, function () { + // empty + }); + $watcher = $loop->delay(0.1, function () use (&$invoked, $loop, $watcher): void { + $invoked = true; + $loop->unreference($watcher); + }); + $loop->unreference($watcher); + }); + self::assertTrue($invoked); + } + + public function testUnreferencedDeferWatcherStillExecutes(): void + { + $invoked = false; + $this->start(function (Driver $loop) use (&$invoked): void { + $watcher = $loop->defer(function () use (&$invoked): void { + $invoked = true; + }); + $loop->unreference($watcher); + $loop->defer(function () { + // just to keep loop running + }); + }); + self::assertTrue($invoked); + } + + public function testLoopDoesNotBlockOnNegativeTimerExpiration(): void + { + $invoked = false; + $this->start(function (Driver $loop) use (&$invoked): void { + $loop->delay(0.001, function () use (&$invoked): void { + $invoked = true; + }); + + \usleep(1000 * 10); + }); + self::assertTrue($invoked); + } + + public function testDisabledDeferReenableInSubsequentTick(): void + { + $this->expectOutputString("123"); + $this->start(function (Driver $loop) { + $watcherId = $loop->defer(function ($watcherId): void { + echo 3; + }); + $loop->disable($watcherId); + $loop->defer(function () use ($loop, $watcherId): void { + $loop->enable($watcherId); + echo 2; + }); + echo 1; + }); + } + + public function provideRegistrationArgs(): array + { + $args = [ + [ + "defer", + [ + function () { + }, + ], + ], + [ + "delay", + [ + 0.005, + function () { + }, + ], + ], + [ + "repeat", + [ + 0.005, + function () { + }, + ], + ], + [ + "onWritable", + [ + \STDOUT, + function () { + }, + ], + ], + [ + "onReadable", + [ + \STDIN, + function () { + }, + ], + ], + [ + "onSignal", + [ + \SIGUSR1, + function () { + }, + ], + ], + ]; + + return $args; + } + + /** @dataProvider provideRegistrationArgs */ + public function testDisableWithConsecutiveCancel(string $type, array $args): void + { + if ($type === "onSignal") { + $this->checkForSignalCapability(); + } + + $invoked = false; + $this->start(function (Driver $loop) use (&$invoked, $type, $args): void { + $func = [$loop, $type]; + $watcherId = $func(...$args); + $loop->disable($watcherId); + $loop->defer(function () use (&$invoked, $loop, $watcherId): void { + $loop->cancel($watcherId); + $invoked = true; + }); + $this->assertFalse($invoked); + }); + self::assertTrue($invoked); + } + + /** @dataProvider provideRegistrationArgs */ + public function testWatcherReferenceInfo(string $type, array $args): void + { + if ($type === "onSignal") { + $this->checkForSignalCapability(); + } + + $loop = $this->loop; + + $func = [$loop, $type]; + if (\substr($type, 0, 2) === "on") { + $type = "on_" . \lcfirst(\substr($type, 2)); + } + + // being referenced is the default + $watcherId1 = $func(...$args); + $info = $loop->getInfo(); + $expected = ["enabled" => 1, "disabled" => 0]; + self::assertSame($expected, $info[$type]); + $expected = ["referenced" => 1, "unreferenced" => 0]; + self::assertSame($expected, $info["enabled_watchers"]); + + // explicitly reference() even though it's the default setting + $argsCopy = $args; + $watcherId2 = \call_user_func_array($func, $argsCopy); + $loop->reference($watcherId2); + $loop->reference($watcherId2); + $info = $loop->getInfo(); + $expected = ["enabled" => 2, "disabled" => 0]; + self::assertSame($expected, $info[$type]); + $expected = ["referenced" => 2, "unreferenced" => 0]; + self::assertSame($expected, $info["enabled_watchers"]); + + // disabling a referenced watcher should decrement the referenced count + $loop->disable($watcherId2); + $loop->disable($watcherId2); + $loop->disable($watcherId2); + $info = $loop->getInfo(); + $expected = ["referenced" => 1, "unreferenced" => 0]; + self::assertSame($expected, $info["enabled_watchers"]); + + // enabling a referenced watcher should increment the referenced count + $loop->enable($watcherId2); + $loop->enable($watcherId2); + $info = $loop->getInfo(); + $expected = ["referenced" => 2, "unreferenced" => 0]; + self::assertSame($expected, $info["enabled_watchers"]); + + // cancelling an referenced watcher should decrement the referenced count + $loop->cancel($watcherId2); + $info = $loop->getInfo(); + $expected = ["referenced" => 1, "unreferenced" => 0]; + self::assertSame($expected, $info["enabled_watchers"]); + + // unreference() should just increment unreferenced count + $watcherId2 = $func(...$args); + $loop->unreference($watcherId2); + $info = $loop->getInfo(); + $expected = ["enabled" => 2, "disabled" => 0]; + self::assertSame($expected, $info[$type]); + $expected = ["referenced" => 1, "unreferenced" => 1]; + self::assertSame($expected, $info["enabled_watchers"]); + + $loop->cancel($watcherId1); + $loop->cancel($watcherId2); + } + + /** @dataProvider provideRegistrationArgs */ + public function testWatcherRegistrationAndCancellationInfo(string $type, array $args): void + { + if ($type === "onSignal") { + $this->checkForSignalCapability(); + } + + $loop = $this->loop; + + $func = [$loop, $type]; + if (\substr($type, 0, 2) === "on") { + $type = "on_" . \lcfirst(\substr($type, 2)); + } + + $watcherId = $func(...$args); + self::assertIsString($watcherId); + $info = $loop->getInfo(); + $expected = ["enabled" => 1, "disabled" => 0]; + self::assertSame($expected, $info[$type]); + + // invoke enable() on active watcher to ensure it has no side-effects + $loop->enable($watcherId); + $info = $loop->getInfo(); + $expected = ["enabled" => 1, "disabled" => 0]; + self::assertSame($expected, $info[$type]); + + // invoke disable() twice to ensure it has no side-effects + $loop->disable($watcherId); + $loop->disable($watcherId); + + $info = $loop->getInfo(); + $expected = ["enabled" => 0, "disabled" => 1]; + self::assertSame($expected, $info[$type]); + + $loop->cancel($watcherId); + $info = $loop->getInfo(); + $expected = ["enabled" => 0, "disabled" => 0]; + self::assertSame($expected, $info[$type]); + + $watcherId = $func(...$args); + $info = $loop->getInfo(); + $expected = ["enabled" => 1, "disabled" => 0]; + self::assertSame($expected, $info[$type]); + + $loop->disable($watcherId); + $info = $loop->getInfo(); + $expected = ["enabled" => 0, "disabled" => 1]; + self::assertSame($expected, $info[$type]); + + $loop->enable($watcherId); + $info = $loop->getInfo(); + $expected = ["enabled" => 1, "disabled" => 0]; + self::assertSame($expected, $info[$type]); + + $loop->cancel($watcherId); + $info = $loop->getInfo(); + $expected = ["enabled" => 0, "disabled" => 0]; + self::assertSame($expected, $info[$type]); + + $loop->disable($watcherId); + $info = $loop->getInfo(); + $expected = ["enabled" => 0, "disabled" => 0]; + self::assertSame($expected, $info[$type]); + } + + /** + * @dataProvider provideRegistrationArgs + * @group memoryleak + */ + public function testNoMemoryLeak(string $type, array $args): void + { + if ($this->getTestResultObject()->getCollectCodeCoverageInformation()) { + self::markTestSkipped("Cannot run this test with code coverage active [code coverage consumes memory which makes it impossible to rely on memory_get_usage()]"); + } + + if (\DIRECTORY_SEPARATOR === '\\') { + self::markTestSkipped('Skip on Windows for now, investigate'); + } + + $runs = 2000; + + if ($type === "onSignal") { + $this->checkForSignalCapability(); + } + + $this->start(function (Driver $loop) use ($type, $args, $runs) { + $initialMem = \memory_get_usage(); + $cb = function ($runs) use ($loop, $type, $args): void { + $func = [$loop, $type]; + for ($watchers = [], $i = 0; $i < $runs; $i++) { + $watchers[] = $func(...$args); + } + foreach ($watchers as $watcher) { + $loop->cancel($watcher); + } + for ($watchers = [], $i = 0; $i < $runs; $i++) { + $watchers[] = $func(...$args); + } + foreach ($watchers as $watcher) { + $loop->disable($watcher); + $loop->cancel($watcher); + } + for ($watchers = [], $i = 0; $i < $runs; $i++) { + $watchers[] = $func(...$args); + } + if ($type === "repeat") { + $loop->delay(0.007, function () use ($loop, $watchers): void { + foreach ($watchers as $watcher) { + $loop->cancel($watcher); + } + }); + } elseif ($type !== "defer" && $type !== "delay") { + $loop->defer(function () use ($loop, $watchers) { + foreach ($watchers as $watcher) { + $loop->cancel($watcher); + } + }); + } + $loop->run(); + if ($type === "defer") { + $loop->defer($fn = static function () use (&$fn, $loop, $runs): void { + static $i = null; + + $i = $i ?? $runs; + + if ($i--) { + $loop->defer($fn); + } + }); + $loop->run(); + } + if ($type === "delay") { + $loop->delay(0, $fn = static function () use (&$fn, $loop, $runs): void { + static $i = null; + + $i = $i ?? $runs; + + if ($i--) { + $loop->delay(0, $fn); + } + }); + $loop->run(); + } + if ($type === "repeat") { + $loop->repeat(0, $fn = static function ($watcherId) use (&$fn, $loop, $runs): void { + static $i = null; + + $i = $i ?? $runs; + + $loop->cancel($watcherId); + if ($i--) { + $loop->repeat(0, $fn); + } + }); + $loop->run(); + } + if ($type === "onWritable") { + $loop->defer(static function ($watcherId) use ($loop, $runs): void { + $fn = static function ($watcherId, $socket) use (&$fn, $loop, $runs): void { + static $i = null; + + $i = $i ?? ($runs + 1); + + $loop->cancel($watcherId); + if ($socket) { + \fwrite($socket, "."); + } + + if ($i--) { + // explicitly use *different* streams with *different* resource ids + $ends = \stream_socket_pair( + \stripos( + PHP_OS, + "win" + ) === 0 ? STREAM_PF_INET : STREAM_PF_UNIX, + STREAM_SOCK_STREAM, + STREAM_IPPROTO_IP + ); + + $loop->onWritable($ends[0], $fn); + $loop->onReadable($ends[1], function ($watcherId) use ($loop): void { + $loop->cancel($watcherId); + }); + } + }; + + $fn($watcherId, null); + }); + $loop->run(); + } + if ($type === "onSignal") { + $sendSignal = function (): void { + \posix_kill(\getmypid(), \SIGUSR1); + }; + $loop->onSignal( + \SIGUSR1, + $fn = static function ($watcherId) use (&$fn, $loop, $sendSignal, $runs): void { + static $i = null; + + $i = $i ?? $runs; + + if ($i--) { + $loop->onSignal(\SIGUSR1, $fn); + $loop->delay(0.001, $sendSignal); + } + $loop->cancel($watcherId); + } + ); + $loop->delay(0.001, $sendSignal); + $loop->run(); + } + }; + $closureMem = \memory_get_usage() - $initialMem; + $cb($runs); /* just to set up eventual structures inside loop without counting towards memory comparison */ + \gc_collect_cycles(); + $initialMem = \memory_get_usage() - $closureMem; + $cb($runs); + unset($cb); + + \gc_collect_cycles(); + $endMem = \memory_get_usage(); + + /* this is allowing some memory usage due to runtime caches etc., but nothing actually leaking */ + $this->assertLessThan($runs * 4, $endMem - $initialMem); // * 4, as 4 is minimal sizeof(void *) + }); + } + + /** + * The first number of each tuple indicates the tick in which the watcher is supposed to execute, the second digit + * indicates the order within the tick. + */ + public function testExecutionOrderGuarantees(): void + { + $this->expectOutputString("01 02 03 04 " . \str_repeat("05 ", 8) . "10 11 12 " . \str_repeat( + "13 ", + 4 + ) . "20 " . \str_repeat("21 ", 4) . "30 40 41 "); + $this->start(function (Driver $loop): void { + // Wrap in extra defer, so driver creation time doesn't count for timers, as timers are driver creation + // relative instead of last tick relative before first tick. + $loop->defer(function () use ($loop): void { + $f = function (...$args) use ($loop): callable { + return function ($watcherId) use ($loop, &$args): void { + if (!$args) { + $this->fail("Watcher callback called too often"); + } + $loop->cancel($watcherId); + echo \array_shift($args) . \array_shift($args), " "; + }; + }; + + $loop->onWritable(STDOUT, $f(0, 5)); + $writ1 = $loop->onWritable(STDOUT, $f(0, 5)); + $writ2 = $loop->onWritable(STDOUT, $f(0, 5)); + + $loop->delay(0, $f(0, 5)); + $del1 = $loop->delay(0, $f(0, 5)); + $del2 = $loop->delay(0, $f(0, 5)); + $del3 = $loop->delay(0, $f()); + $del4 = $loop->delay(0, $f(1, 3)); + $del5 = $loop->delay(0, $f(2, 0)); + $loop->defer(function () use ($loop, $del5): void { + $loop->disable($del5); + }); + $loop->cancel($del3); + $loop->disable($del1); + $loop->disable($del2); + + $writ3 = $loop->onWritable(STDOUT, $f()); + $loop->cancel($writ3); + $loop->disable($writ1); + $loop->disable($writ2); + $loop->enable($writ1); + $writ4 = $loop->onWritable(STDOUT, $f(1, 3)); + $loop->onWritable(STDOUT, $f(0, 5)); + $loop->enable($writ2); + $loop->disable($writ4); + $loop->defer(function () use ($loop, $writ4, $f): void { + $loop->enable($writ4); + $loop->onWritable(STDOUT, $f(1, 3)); + }); + + $loop->enable($del1); + $loop->delay(0, $f(0, 5)); + $loop->enable($del2); + $loop->disable($del4); + $loop->defer(function () use ($loop, $del4, $f): void { + $loop->enable($del4); + $loop->onWritable(STDOUT, $f(1, 3)); + }); + + $loop->delay(1, $f(4, 1)); + $loop->delay(0.6, $f(3, 0)); + $loop->delay(0.5, $f(2, 1)); + $loop->repeat(0.5, $f(2, 1)); + $rep1 = $loop->repeat(0.25, $f(2, 1)); + $loop->disable($rep1); + $loop->delay(0.5, $f(2, 1)); + $loop->enable($rep1); + + $loop->defer($f(0, 1)); + $def1 = $loop->defer($f(0, 3)); + $def2 = $loop->defer($f(1, 1)); + $def3 = $loop->defer($f()); + $loop->defer($f(0, 2)); + $loop->disable($def1); + $loop->cancel($def3); + $loop->enable($def1); + $loop->defer(function () use ($loop, $def2, $del5, $f): void { + $tick = $f(0, 4); + $tick("invalid"); + $loop->defer($f(1, 0)); + $loop->enable($def2); + $loop->defer($f(1, 2)); + $loop->defer(function () use ($loop, $del5, $f): void { + $loop->enable($del5); + $loop->defer(function () use ($loop, $f): void { + \usleep(700000); // to have delays of 0.5 and 0.6 run at the same tick (but not 0.15) + $loop->defer(function () use ($loop, $f): void { + $loop->defer($f(4, 0)); + }); + }); + }); + }); + $loop->disable($def2); + }); + }); + } + + public function testSignalExecutionOrder(): void + { + $this->checkForSignalCapability(); + + $this->expectOutputString("122222"); + $this->start(function (Driver $loop): void { + $f = function ($i) use ($loop) { + return function ($watcherId) use ($loop, $i): void { + $loop->cancel($watcherId); + echo $i; + }; + }; + + $loop->defer($f(1)); + $loop->onSignal(SIGUSR1, $f(2)); + $sig1 = $loop->onSignal(SIGUSR1, $f(2)); + $sig2 = $loop->onSignal(SIGUSR1, $f(2)); + $sig3 = $loop->onSignal(SIGUSR1, $f(" FAIL - MUST NOT BE CALLED ")); + $loop->disable($sig1); + $loop->onSignal(SIGUSR1, $f(2)); + $loop->disable($sig2); + $loop->enable($sig1); + $loop->cancel($sig3); + $loop->onSignal(SIGUSR1, $f(2)); + $loop->defer(function () use ($loop, $sig2): void { + $loop->enable($sig2); + $loop->delay(0.001, function () use ($loop) { + \posix_kill(\getmypid(), \SIGUSR1); + $loop->delay(0.01, function () use ($loop): void { + $loop->stop(); + }); + }); + }); + }); + } + + public function testExceptionOnEnableNonexistentWatcher(): void + { + $this->expectException(InvalidWatcherError::class); + + try { + $this->loop->enable("nonexistentWatcher"); + } catch (InvalidWatcherError $e) { + self::assertSame("nonexistentWatcher", $e->getWatcherId()); + throw $e; + } + } + + public function testSuccessOnDisableNonexistentWatcher(): void + { + $this->loop->disable("nonexistentWatcher"); + + // Otherwise risky, throwing fails the test + self::assertTrue(true); + } + + public function testSuccessOnCancelNonexistentWatcher(): void + { + $this->loop->cancel("nonexistentWatcher"); + + // Otherwise risky, throwing fails the test + self::assertTrue(true); + } + + public function testExceptionOnReferenceNonexistentWatcher(): void + { + $this->expectException(InvalidWatcherError::class); + + try { + $this->loop->reference("nonexistentWatcher"); + } catch (InvalidWatcherError $e) { + self::assertSame("nonexistentWatcher", $e->getWatcherId()); + throw $e; + } + } + + public function testSuccessOnUnreferenceNonexistentWatcher(): void + { + $this->loop->unreference("nonexistentWatcher"); + + // Otherwise risky, throwing fails the test + self::assertTrue(true); + } + + public function testWatcherInvalidityOnDefer(): void + { + $this->expectException(InvalidWatcherError::class); + + $this->start(function (Driver $loop): void { + $loop->defer(function ($watcher) use ($loop): void { + $loop->enable($watcher); + }); + }); + } + + public function testWatcherInvalidityOnDelay(): void + { + $this->expectException(InvalidWatcherError::class); + + $this->start(function (Driver $loop): void { + $loop->delay(0, function ($watcher) use ($loop): void { + $loop->enable($watcher); + }); + }); + } + + public function testEventsNotExecutedInSameTickAsEnabled(): void + { + $this->start(function (Driver $loop): void { + $loop->defer(function () use ($loop): void { + $loop->defer(function () use ($loop, &$diswatchers, &$watchers): void { + $loop->defer(function () use ($loop, $diswatchers): void { + foreach ($diswatchers as $watcher) { + $loop->disable($watcher); + } + $loop->defer(function () use ($loop, $diswatchers): void { + $loop->defer(function () use ($loop, $diswatchers): void { + foreach ($diswatchers as $watcher) { + $loop->cancel($watcher); + } + }); + foreach ($diswatchers as $watcher) { + $loop->enable($watcher); + } + }); + }); + foreach ($watchers as $watcher) { + $loop->cancel($watcher); + } + foreach ($diswatchers as $watcher) { + $loop->disable($watcher); + $loop->enable($watcher); + } + }); + + $f = function () use ($loop): array { + $watchers[] = $loop->defer([$this, "fail"]); + $watchers[] = $loop->delay(0, [$this, "fail"]); + $watchers[] = $loop->repeat(0, [$this, "fail"]); + $watchers[] = $loop->onWritable(STDIN, [$this, "fail"]); + return $watchers; + }; + $watchers = $f(); + $diswatchers = $f(); + }); + }); + + // Otherwise risky, as we only rely on $this->fail() + self::assertTrue(true); + } + + public function testEnablingWatcherAllowsSubsequentInvocation(): void + { + $increment = 0; + $watcherId = $this->loop->defer(function () use (&$increment): void { + $increment++; + }); + $this->loop->disable($watcherId); + $this->loop->delay(0.005, [$this->loop, "stop"]); + $this->loop->run(); + self::assertSame(0, $increment); + $this->loop->enable($watcherId); + $this->loop->delay(0.005, [$this->loop, "stop"]); + $this->loop->run(); + self::assertSame(1, $increment); + } + + public function testUnresolvedEventsAreReenabledOnRunFollowingPreviousStop(): void + { + $increment = 0; + $this->start(function (Driver $loop) use (&$increment): void { + $loop->defer([$loop, "stop"]); + $loop->run(); + + $loop->defer(function () use (&$increment, $loop): void { + $loop->delay(0.1, function () use ($loop, &$increment): void { + $increment++; + $loop->stop(); + }); + }); + + $this->assertSame(0, $increment); + \usleep(5000); + }); + self::assertSame(1, $increment); + } + + public function testTimerWatcherParameterOrder(): void + { + $this->start(function (Driver $loop): void { + $counter = 0; + $loop->defer(function ($watcherId) use ($loop, &$counter): void { + $this->assertIsString($watcherId); + if (++$counter === 3) { + $loop->stop(); + } + }); + $loop->delay(0.005, function ($watcherId) use ($loop, &$counter): void { + $this->assertIsString($watcherId); + if (++$counter === 3) { + $loop->stop(); + } + }); + $loop->repeat(0.005, function ($watcherId) use ($loop, &$counter): void { + $this->assertIsString($watcherId); + $loop->cancel($watcherId); + if (++$counter === 3) { + $loop->stop(); + } + }); + }); + } + + public function testStreamWatcherParameterOrder(): void + { + $this->start(function (Driver $loop) use (&$invoked): void { + $invoked = 0; + $loop->onWritable(STDOUT, function ($watcherId, $stream) use ($loop, &$invoked): void { + $this->assertIsString($watcherId); + $this->assertSame(STDOUT, $stream); + $invoked++; + $loop->cancel($watcherId); + }); + }); + self::assertSame(1, $invoked); + } + + public function testDisablingWatcherPreventsSubsequentInvocation(): void + { + $this->start(function (Driver $loop): void { + $increment = 0; + $watcherId = $loop->defer(function () use (&$increment): void { + $increment++; + }); + + $loop->disable($watcherId); + $loop->delay(0.005, [$loop, "stop"]); + + $this->assertSame(0, $increment); + }); + } + + public function testImmediateExecution(): void + { + $increment = 0; + $this->start(function (Driver $loop) use (&$increment): void { + $loop->defer(function () use (&$increment): void { + $increment++; + }); + $loop->defer([$loop, "stop"]); + }); + self::assertSame(1, $increment); + } + + public function testImmediatelyCallbacksDoNotRecurseInSameTick(): void + { + $increment = 0; + $this->start(function (Driver $loop) use (&$increment): void { + $loop->defer(function () use ($loop, &$increment) { + $increment++; + $loop->defer(function () use (&$increment) { + $increment++; + }); + }); + $loop->defer([$loop, "stop"]); + }); + self::assertSame(1, $increment); + } + + public function testRunExecutesEventsUntilExplicitlyStopped(): void + { + $increment = 0; + $this->start(function (Driver $loop) use (&$increment): void { + $loop->repeat(0.005, function ($watcherId) use ($loop, &$increment): void { + $increment++; + if ($increment === 10) { + $loop->cancel($watcherId); + } + }); + }); + self::assertSame(10, $increment); + } + + public function testLoopAllowsExceptionToBubbleUpDuringStart(): void + { + $this->expectException(\Exception::class); + $this->expectExceptionMessage("loop error"); + + $this->start(function (Driver $loop): void { + $loop->defer(function (): void { + throw new \Exception("loop error"); + }); + }); + } + + public function testLoopAllowsExceptionToBubbleUpFromRepeatingAlarmDuringStart(): void + { + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage("test"); + + $this->start(function (Driver $loop): void { + $loop->repeat(0.001, function (): void { + throw new \RuntimeException("test"); + }); + }); + } + + public function testErrorHandlerCapturesUncaughtException(): void + { + $msg = ""; + $this->loop->setErrorHandler($f = function (): void { + }); + $oldErrorHandler = $this->loop->setErrorHandler(function (\Exception $error) use (&$msg): void { + $msg = $error->getMessage(); + }); + self::assertSame($f, $oldErrorHandler); + $this->start(function (Driver $loop) { + $loop->defer(function () { + throw new \Exception("loop error"); + }); + }); + self::assertSame("loop error", $msg); + } + + public function testOnErrorFailure(): void + { + $this->expectException(\Exception::class); + $this->expectExceptionMessage("errorception"); + + $this->loop->setErrorHandler(function (): void { + throw new \Exception("errorception"); + }); + $this->start(function (Driver $loop): void { + $loop->delay(0.005, function () { + throw new \Exception("error"); + }); + }); + } + + public function testLoopException(): void + { + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage("test"); + + $this->start(function (Driver $loop): void { + $loop->defer(function () use ($loop): void { + // force next tick, outside of primary startup tick + $loop->defer(function () { + throw new \RuntimeException("test"); + }); + }); + }); + } + + public function testOnSignalWatcher(): void + { + $this->checkForSignalCapability(); + + $this->expectOutputString("caught SIGUSR1"); + $this->start(function (Driver $loop): void { + $loop->delay(0.001, function () use ($loop): void { + \posix_kill(\getmypid(), \SIGUSR1); + $loop->delay(0.01, [$loop, "stop"]); + }); + + $loop->onSignal(SIGUSR1, function ($watcherId) use ($loop): void { + $loop->cancel($watcherId); + echo "caught SIGUSR1"; + }); + }); + } + + public function testInitiallyDisabledOnSignalWatcher(): void + { + $this->checkForSignalCapability(); + + $this->expectOutputString("caught SIGUSR1"); + $this->start(function (Driver $loop): void { + $stop = $loop->delay(0.1, function () use ($loop): void { + echo "ERROR: manual stop"; + $loop->stop(); + }); + $watcherId = $loop->onSignal(SIGUSR1, function ($watcherId) use ($loop, $stop): void { + echo "caught SIGUSR1"; + $loop->disable($stop); + $loop->disable($watcherId); + }); + $loop->disable($watcherId); + + $loop->delay(0.001, function () use ($loop, $watcherId): void { + $loop->enable($watcherId); + $loop->delay(0.001, function () { + \posix_kill(\getmypid(), SIGUSR1); + }); + }); + }); + } + + public function testNestedLoopSignalDispatch(): void + { + $this->checkForSignalCapability(); + + $this->expectOutputString("inner SIGUSR2\nouter SIGUSR1\n"); + $this->start(function (Driver $loop): void { + $loop->delay(0.3, function () use ($loop): void { + $loop->stop(); + }); + $loop->onSignal(SIGUSR1, function () use ($loop): void { + echo "outer SIGUSR1\n"; + $loop->stop(); + }); + + $loop->delay(0.001, function (): void { + /** @var Driver $loop */ + $loop = ($this->getFactory())(); + $stop = $loop->delay(0.1, function () use ($loop): void { + echo "ERROR: manual stop"; + $loop->stop(); + }); + $loop->onSignal(SIGUSR2, function ($watcherId) use ($loop, $stop): void { + echo "inner SIGUSR2\n"; + $loop->cancel($stop); + $loop->cancel($watcherId); + }); + $loop->delay(0.001, function (): void { + \posix_kill(\getmypid(), SIGUSR2); + }); + $loop->run(); + }); + + $loop->delay(0.02, function (): void { + \posix_kill(\getmypid(), \SIGUSR1); + }); + }); + } + + public function testCancelRemovesWatcher(): void + { + $invoked = false; + + $this->start(function (Driver $loop) use (&$invoked): void { + $watcherId = $loop->delay(0.01, function (): void { + $this->fail('Watcher was not cancelled as expected'); + }); + + $loop->defer(function () use ($loop, $watcherId, &$invoked): void { + $loop->cancel($watcherId); + $invoked = true; + }); + + $loop->delay(0.005, [$loop, "stop"]); + }); + + self::assertTrue($invoked); + } + + public function testOnWritableWatcher(): void + { + $flag = false; + $this->start(function (Driver $loop) use (&$flag): void { + $loop->onWritable(STDOUT, function () use ($loop, &$flag) { + $flag = true; + $loop->stop(); + }); + $loop->delay(0.005, [$loop, "stop"]); + }); + self::assertTrue($flag); + } + + public function testInitiallyDisabledWriteWatcher(): void + { + $increment = 0; + $this->start(function (Driver $loop): void { + $watcherId = $loop->onWritable(STDOUT, function () use (&$increment): void { + $increment++; + }); + $loop->disable($watcherId); + $loop->delay(0.005, [$loop, "stop"]); + }); + self::assertSame(0, $increment); + } + + public function testInitiallyDisabledWriteWatcherIsTriggeredOnceEnabled(): void + { + $this->expectOutputString("12"); + $this->start(function (Driver $loop): void { + $watcherId = $loop->onWritable(STDOUT, function () use ($loop): void { + echo 2; + $loop->stop(); + }); + $loop->disable($watcherId); + $loop->defer(function () use ($loop, $watcherId): void { + $loop->enable($watcherId); + echo 1; + }); + }); + } + + public function testStreamWatcherDoesntSwallowExceptions(): void + { + $this->expectException(\RuntimeException::class); + + $this->start(function (Driver $loop): void { + $loop->onWritable(STDOUT, function () { + throw new \RuntimeException(); + }); + $loop->delay(0.005, [$loop, "stop"]); + }); + } + + public function testReactorRunsUntilNoWatchersRemain(): void + { + $var1 = $var2 = 0; + $this->start(function (Driver $loop) use (&$var1, &$var2): void { + $loop->repeat(0.001, function ($watcherId) use ($loop, &$var1): void { + if (++$var1 === 3) { + $loop->cancel($watcherId); + } + }); + + $loop->onWritable(STDOUT, function ($watcherId) use ($loop, &$var2): void { + if (++$var2 === 4) { + $loop->cancel($watcherId); + } + }); + }); + self::assertSame(3, $var1); + self::assertSame(4, $var2); + } + + public function testReactorRunsUntilNoWatchersRemainWhenStartedDeferred(): void + { + $var1 = $var2 = 0; + $this->start(function (Driver $loop) use (&$var1, &$var2): void { + $loop->defer(function () use ($loop, &$var1, &$var2): void { + $loop->repeat(0.001, function ($watcherId) use ($loop, &$var1): void { + if (++$var1 === 3) { + $loop->cancel($watcherId); + } + }); + + $loop->onWritable(STDOUT, function ($watcherId) use ($loop, &$var2): void { + if (++$var2 === 4) { + $loop->cancel($watcherId); + } + }); + }); + }); + self::assertSame(3, $var1); + self::assertSame(4, $var2); + } + + public function testOptionalCallbackDataPassedOnInvocation(): void + { + $callbackData = new \StdClass(); + + $this->start(function (Driver $loop) use ($callbackData): void { + $loop->defer(function ($watcherId) use ($callbackData): void { + $callbackData->defer = true; + }); + $loop->delay(0.001, function ($watcherId) use ($callbackData): void { + $callbackData->delay = true; + }); + $loop->repeat(0.001, function ($watcherId) use ($loop, $callbackData): void { + $callbackData->repeat = true; + $loop->cancel($watcherId); + }); + $loop->onWritable(STDERR, function ($watcherId, $stream) use ($loop, $callbackData): void { + $callbackData->onWritable = true; + $loop->cancel($watcherId); + }); + }); + + self::assertTrue($callbackData->defer); + self::assertTrue($callbackData->delay); + self::assertTrue($callbackData->repeat); + self::assertTrue($callbackData->onWritable); + } + + public function testLoopStopPreventsTimerExecution(): void + { + $t = \microtime(1); + $this->start(function (Driver $loop): void { + $loop->defer(function () use ($loop): void { + $loop->delay(1, function (): void { + $this->fail("Timer was executed despite stopped loop"); + }); + }); + $loop->defer([$loop, "stop"]); + }); + self::assertGreaterThan(\microtime(1), $t + 0.1); + } + + public function testDeferEnabledInNextTick(): void + { + $tick = function () { + $this->loop->defer([$this->loop, "stop"]); + $this->loop->run(); + }; + + $invoked = 0; + + $watcher = $this->loop->onWritable(STDOUT, function () use (&$invoked): void { + $invoked++; + }); + + $tick(); + $tick(); + $tick(); + + $this->loop->disable($watcher); + $this->loop->enable($watcher); + $tick(); // disable + immediate enable after a tick should have no effect either + + self::assertSame(4, $invoked); + } + + public function testMicrotaskExecutedImmediatelyAfterWatcher(): void + { + self::expectOutputString('12835674'); + + $this->loop->queue(function (): void { + print 1; + }); + + $this->start(function (Driver $loop): void { + $loop->queue(function (): void { + print 2; + }); + + $loop->defer(function () use ($loop): void { + print 3; + + $loop->defer(function (): void { + print 4; + }); + + $loop->queue(function (): void { + print 5; + }); + + $loop->queue(function (): void { + print 6; + }); + }); + + $loop->defer(function (): void { + print 7; + }); + + $loop->queue(function (): void { + print 8; + }); + }); + } + + public function testMicrotaskThrowingStillExecutesNextMicrotask(): void + { + $exception = new \Exception(); + $invoked = false; + + try { + $this->start(function (Driver $loop) use (&$invoked, $exception): void { + $loop->queue(function () use ($exception): void { + throw $exception; + }); + + $loop->queue(function () use (&$invoked): void { + $invoked = true; + }); + }); + } catch (\Exception $e) { + self::assertSame($exception, $e); + } + + $this->start(fn () => null); + + self::assertTrue($invoked); + } + + public function testRethrowsFromCallbacks(): void + { + foreach (["onReadable", "onWritable", "defer", "delay", "repeat", "onSignal"] as $watcher) { + if ($watcher === "onSignal") { + $this->checkForSignalCapability(); + } + + try { + $args = []; + + switch ($watcher) { + case "onSignal": + $args[] = SIGUSR1; + break; + + case "onWritable": + $args[] = STDOUT; + break; + + case "onReadable": + $ends = \stream_socket_pair( + \stripos(PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX, + STREAM_SOCK_STREAM, + STREAM_IPPROTO_IP + ); + \fwrite($ends[0], "trigger readability watcher"); + $args[] = $ends[1]; + break; + + case "delay": + case "repeat": + $args[] = 0.005; + break; + } + + $args[] = function ($watcherId) { + $this->loop->cancel($watcherId); + throw new \Exception("rethrow test"); + }; + + [$this->loop, $watcher](...$args); + + if ($watcher === "onSignal") { + $this->loop->delay(0.1, function () { + \posix_kill(\getmypid(), \SIGUSR1); + }); + } + + $this->loop->run(); + + self::fail("Didn't throw expected exception."); + } catch (\Exception $e) { + self::assertSame("rethrow test", $e->getMessage()); + } + } + } + + public function testMultipleWatchersOnSameDescriptor(): void + { + $sockets = \stream_socket_pair( + \stripos(PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX, + STREAM_SOCK_STREAM, + STREAM_IPPROTO_IP + ); + \fwrite($sockets[1], "testing"); + + $invoked = 0; + $watcher1 = $this->loop->onReadable($sockets[0], function ($watcher) use (&$invoked): void { + $invoked += 1; + $this->loop->disable($watcher); + }); + $watcher2 = $this->loop->onReadable($sockets[0], function ($watcher) use (&$invoked): void { + $invoked += 10; + $this->loop->disable($watcher); + }); + $watcher3 = $this->loop->onWritable($sockets[0], function ($watcher) use (&$invoked): void { + $invoked += 100; + $this->loop->disable($watcher); + }); + $watcher4 = $this->loop->onWritable($sockets[0], function ($watcher) use (&$invoked): void { + $invoked += 1000; + $this->loop->disable($watcher); + }); + + $this->loop->defer(function () use ($watcher1, $watcher3): void { + $this->loop->delay(0.2, function () use ($watcher1, $watcher3): void { + $this->loop->enable($watcher1); + $this->loop->enable($watcher3); + }); + }); + + $this->loop->run(); + + self::assertSame(1212, $invoked); + + $this->loop->enable($watcher1); + $this->loop->enable($watcher3); + + $this->loop->delay(0.1, function () use ($watcher2, $watcher4) { + $this->loop->enable($watcher2); + $this->loop->enable($watcher4); + }); + + $this->loop->run(); + + self::assertSame(2323, $invoked); + } + + public function testStreamWritableIfConnectFails(): void + { + // first verify the operating system actually refuses the connection and no firewall is in place + // use higher timeout because Windows retires multiple times and has a noticeable delay + // @link https://stackoverflow.com/questions/19440364/why-do-failed-attempts-of-socket-connect-take-1-sec-on-windows + $errno = $errstr = null; + if ( + @\stream_socket_client('127.0.0.1:1', $errno, $errstr, 10) !== false + || (\defined('SOCKET_ECONNREFUSED') && $errno !== \SOCKET_ECONNREFUSED) + ) { + self::markTestSkipped('Expected host to refuse connection, but got error ' . $errno . ': ' . $errstr); + } + + $connecting = \stream_socket_client( + '127.0.0.1:1', + $errno, + $errstr, + 0, + STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT + ); + + $called = 0; + $writeWatcher = $this->loop->onWritable($connecting, function (string $watcher) use (&$called) { + ++$called; + + $this->loop->cancel($watcher); + }); + + $this->loop->unreference($this->loop->delay(10, function () use ($writeWatcher) { + $this->loop->cancel($writeWatcher); + })); + + $this->loop->run(); + + self::assertEquals(1, $called); + } + + public function testTimerIntervalCountedWhenNotRunning(): void + { + $this->loop->delay(1, function () use (&$start): void { + $this->assertLessThan(0.5, \microtime(true) - $start); + }); + + \usleep(600000); // 600ms instead of 500ms to allow for variations in timing. + $start = \microtime(true); + $this->loop->run(); + } + + public function testShortTimerDoesNotBlockOtherTimers(): void + { + $this->loop->repeat(0, function (): void { + static $i = 0; + + if (++$i === 5) { + $this->fail("Loop continues with repeat watcher"); + } + + \usleep(2000); + }); + + $this->loop->delay(0.002, function (): void { + $this->assertTrue(true); + $this->loop->stop(); + }); + + $this->loop->run(); + } + + public function testTwoShortRepeatTimersWorkAsExpected(): void + { + $this->loop->repeat(0, function () use (&$j): void { + static $i = 0; + if (++$i === 5) { + $this->loop->stop(); + } + $j = $i; + }); + $this->loop->repeat(0, function () use (&$k): void { + static $i = 0; + if (++$i === 5) { + $this->loop->stop(); + } + $k = $i; + }); + + $this->loop->run(); + self::assertLessThan(2, \abs($j - $k)); + self::assertNotSame(0, $j); + } + + protected function start($cb): void + { + $cb($this->loop); + $this->loop->run(); + } +} diff --git a/test/Driver/EvDriverTest.php b/test/Driver/EvDriverTest.php new file mode 100644 index 0000000..29e73d0 --- /dev/null +++ b/test/Driver/EvDriverTest.php @@ -0,0 +1,26 @@ +loop->getHandle()); + } + + public function testSupported(): void + { + self::assertTrue(EvDriver::isSupported()); + } +} diff --git a/test/Driver/EventDriverTest.php b/test/Driver/EventDriverTest.php new file mode 100644 index 0000000..59cf0f3 --- /dev/null +++ b/test/Driver/EventDriverTest.php @@ -0,0 +1,26 @@ +loop->getHandle()); + } + + public function testSupported(): void + { + self::assertTrue(EventDriver::isSupported()); + } +} diff --git a/test/Driver/StreamSelectDriverTest.php b/test/Driver/StreamSelectDriverTest.php new file mode 100644 index 0000000..288fa82 --- /dev/null +++ b/test/Driver/StreamSelectDriverTest.php @@ -0,0 +1,114 @@ +loop->getHandle()); + } + + /** + * @requires PHP 7.1 + */ + public function testAsyncSignals(): void + { + if (\DIRECTORY_SEPARATOR === '\\') { + self::markTestSkipped('Skip on Windows'); + } + + \pcntl_async_signals(true); + + try { + $this->start(function (Driver $loop) use (&$invoked) { + $watcher = $loop->onSignal(SIGUSR1, function () use (&$invoked) { + $invoked = true; + }); + $loop->unreference($watcher); + $loop->defer(function () { + \posix_kill(\getmypid(), \SIGUSR1); + }); + }); + } finally { + \pcntl_async_signals(false); + } + + self::assertTrue($invoked); + } + + public function testTooLargeFileDescriptorSet(): void + { + $sockets = []; + $domain = \stripos(PHP_OS, 'win') === 0 ? STREAM_PF_INET : STREAM_PF_UNIX; + + for ($i = 0; $i < 1001; $i++) { + $sockets[] = \stream_socket_pair($domain, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + } + + $this->expectException(\Exception::class); + $this->expectExceptionMessage("You have reached the limits of stream_select(). It has a FD_SETSIZE of 1024, but you have file descriptors numbered at least as high as 20"); + + $this->start(function (Driver $loop) use ($sockets) { + $loop->delay(0.1, function () { + // here to provide timeout to stream_select, as the warning is only issued after the system call returns + }); + + foreach ($sockets as [$left, $right]) { + $loop->onReadable($left, function () { + // nothing + }); + + $loop->onReadable($right, function () { + // nothing + }); + } + }); + } + + /** + * @requires extension pcntl + */ + public function testSignalDuringStreamSelectIgnored(): void + { + if (\DIRECTORY_SEPARATOR === '\\') { + self::markTestSkipped('Skip on Windows'); + } + + $sockets = \stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + + $this->start(function (Driver $loop) use ($sockets) { + $socketWatchers = [ + $loop->onReadable($sockets[0], function () { + // nothing + }), + $loop->onReadable($sockets[1], function () { + // nothing + }), + ]; + + $loop->onSignal(\SIGUSR2, function ($signalWatcher) use ($socketWatchers, $loop) { + $loop->cancel($signalWatcher); + + foreach ($socketWatchers as $watcher) { + $loop->cancel($watcher); + } + + $this->assertTrue(true); + }); + + $loop->delay(0.1, function () { + \proc_open('sh -c "sleep 1; kill -USR2 ' . \getmypid() . '"', [], $pipes); + }); + }); + } +} diff --git a/test/Driver/TimerQueueTest.php b/test/Driver/TimerQueueTest.php new file mode 100644 index 0000000..fc8d166 --- /dev/null +++ b/test/Driver/TimerQueueTest.php @@ -0,0 +1,68 @@ +insert($watcher); + } + $queue->remove($toRemove); + + \array_splice($values, $indexToRemove, 1); + \sort($values); + $output = []; + while (($extracted = $queue->extract(\PHP_INT_MAX)) !== null) { + $output[] = $extracted->expiration; + } + + self::assertSame($values, $output); + } +} diff --git a/test/Driver/TracingDriverTest.php b/test/Driver/TracingDriverTest.php new file mode 100644 index 0000000..cbcf726 --- /dev/null +++ b/test/Driver/TracingDriverTest.php @@ -0,0 +1,23 @@ +loop->getHandle(); + self::assertTrue(\is_resource($handle) || $handle instanceof \UVLoop); + } + + public function testSupported(): void + { + self::assertTrue(UvDriver::isSupported()); + } +} diff --git a/test/EventLoopTest.php b/test/EventLoopTest.php new file mode 100644 index 0000000..135174f --- /dev/null +++ b/test/EventLoopTest.php @@ -0,0 +1,197 @@ +expectException(\Error::class); + + EventLoop::delay(-1, fn () => null); + } + + public function testRepeatWithNegativeInterval(): void + { + $this->expectException(\Error::class); + + EventLoop::repeat(-1, fn () => null); + } + + public function testOnReadable(): void + { + if (!\class_exists(\Fiber::class, false)) { + self::markTestSkipped("Fibers required for this test"); + } + + $ends = \stream_socket_pair( + \stripos(PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX, + STREAM_SOCK_STREAM, + STREAM_IPPROTO_IP + ); + \fwrite($ends[0], "trigger readability watcher"); + + $count = 0; + $suspension = EventLoop::createSuspension(); + + EventLoop::onReadable($ends[1], function ($watcher) use (&$count, $suspension): void { + $this->assertTrue(true); + EventLoop::cancel($watcher); + $count++; + + $suspension->resume(null); + }); + + $suspension->suspend(); + + self::assertSame(1, $count); + } + + public function testOnWritable() + { + if (!\class_exists(\Fiber::class, false)) { + self::markTestSkipped("Fibers required for this test"); + } + + $count = 0; + $suspension = EventLoop::createSuspension(); + + EventLoop::onWritable(STDOUT, function ($watcher) use (&$count, $suspension): void { + $this->assertTrue(true); + EventLoop::cancel($watcher); + $count++; + + $suspension->resume(null); + }); + + $suspension->suspend(); + + self::assertSame(1, $count); + } + + public function testGet(): void + { + self::assertInstanceOf(Driver::class, EventLoop::getDriver()); + } + + public function testGetInfo(): void + { + self::assertSame(EventLoop::getDriver()->getInfo(), EventLoop::getInfo()); + } + + public function testRun(): void + { + $invoked = false; + EventLoop::defer(function () use (&$invoked): void { + $invoked = true; + }); + + EventLoop::run(); + + self::assertTrue($invoked); + } + + public function testRunInFiber(): void + { + if (!\class_exists(\Fiber::class, false)) { + self::markTestSkipped("Fibers required for this test"); + } + + launch(fn () => EventLoop::run()); + + $this->expectException(\Error::class); + $this->expectExceptionMessage("within a fiber"); + + EventLoop::run(); + } + + public function testRunAfterSuspension(): void + { + if (!\class_exists(\Fiber::class, false)) { + self::markTestSkipped("Fibers required for this test"); + } + + $suspension = EventLoop::createSuspension(); + + EventLoop::defer(fn () => $suspension->resume('test')); + + self::assertSame($suspension->suspend(), 'test'); + + $invoked = false; + EventLoop::defer(function () use (&$invoked): void { + $invoked = true; + }); + + EventLoop::run(); + + self::assertTrue($invoked); + } + + public function testSuspensionAfter(): void + { + if (!\class_exists(\Fiber::class, false)) { + self::markTestSkipped("Fibers required for this test"); + } + + $invoked = false; + EventLoop::defer(function () use (&$invoked): void { + $invoked = true; + }); + + EventLoop::run(); + + self::assertTrue($invoked); + + $suspension = EventLoop::createSuspension(); + + EventLoop::defer(fn () => $suspension->resume('test')); + + self::assertSame($suspension->suspend(), 'test'); + } + + public function testSuspensionWithinFiberWithinRun(): void + { + if (!\class_exists(\Fiber::class, false)) { + self::markTestSkipped("Fibers required for this test"); + } + + $invoked = false; + launch(function () use (&$invoked): void { + $suspension = EventLoop::createSuspension(); + + EventLoop::defer(fn () => $suspension->resume('test')); + + self::assertSame($suspension->suspend(), 'test'); + + $invoked = true; + }); + + EventLoop::run(); + + self::assertTrue($invoked); + } + + public function testSuspensionWithinWatcherCallback(): void + { + if (!\class_exists(\Fiber::class, false)) { + self::markTestSkipped("Fibers required for this test"); + } + + $send = 42; + + EventLoop::defer(static function () use (&$received, $send): void { + $suspension = EventLoop::createSuspension(); + EventLoop::defer(static fn () => $suspension->resume($send)); + $received = $suspension->suspend(); + }); + + + EventLoop::run(); + + self::assertSame($send, $received); + } +} diff --git a/tools/php-cs-fixer/composer.json b/tools/php-cs-fixer/composer.json new file mode 100644 index 0000000..029b32c --- /dev/null +++ b/tools/php-cs-fixer/composer.json @@ -0,0 +1,5 @@ +{ + "require": { + "friendsofphp/php-cs-fixer": "^3.0" + } +}