Skip to content

Commit

Permalink
change workerman coroutine runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
crazywhalecc committed Mar 5, 2023
1 parent 6df72ec commit 22e4a71
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 11 deletions.
7 changes: 1 addition & 6 deletions src/OneBot/Driver/Event/EventDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace OneBot\Driver\Event;

use OneBot\Driver\Coroutine\Adaptive;
use OneBot\Driver\Interfaces\HandledDispatcherInterface;
use OneBot\Exception\ExceptionHandler;

Expand All @@ -14,12 +13,8 @@ class EventDispatcher implements HandledDispatcherInterface
/**
* 分发事件
*/
public function dispatch(object $event, bool $inside = false): object
public function dispatch(object $event): object
{
if (($co = Adaptive::getCoroutine()) !== null && !$inside) {
$co->create([$this, 'dispatch'], $event, true);
return $event;
}
foreach (ob_event_provider()->getEventListeners($event->getName()) as $listener) {
try {
// TODO: 允许 Listener 修改 $event
Expand Down
2 changes: 1 addition & 1 deletion src/OneBot/Driver/Socket/SocketFlag.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
trait SocketFlag
{
/** @var int */
protected $flag = 0;
protected $flag = 1;

public function setFlag(int $flag): self
{
Expand Down
35 changes: 31 additions & 4 deletions src/OneBot/Driver/Workerman/TopEventListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,37 @@ public function onWorkerStart(Worker $worker)
{
ProcessManager::initProcess(ONEBOT_PROCESS_WORKER, $worker->id);
Adaptive::initWithDriver(WorkermanDriver::getInstance());
ob_event_dispatcher()->dispatchWithHandler(new WorkerStartEvent());
if (($co = Adaptive::getCoroutine()) !== null) {
$co->create(fn () => ob_event_dispatcher()->dispatchWithHandler(new WorkerStartEvent()));
} else {
ob_event_dispatcher()->dispatchWithHandler(new WorkerStartEvent());
}
}

/**
* Workerman 的顶层 workerStop 事件回调
*/
public function onWorkerStop()
{
ob_event_dispatcher()->dispatchWithHandler(new WorkerStopEvent());
if (($co = Adaptive::getCoroutine()) !== null) {
$co->create(fn () => ob_event_dispatcher()->dispatchWithHandler(new WorkerStopEvent()));
} else {
ob_event_dispatcher()->dispatchWithHandler(new WorkerStopEvent());
}
}

/**
* Workerman 的顶层 onWebSocketConnect 事件回调
*
* @param TcpConnection $connection 连接本身
* @param mixed $data 数据
*/
public function onWebSocketOpen(array $config, TcpConnection $connection, $data)
public function onWebSocketOpen(array $config, TcpConnection $connection)
{
// 协程套娃
if (($co = Adaptive::getCoroutine()) !== null && $co->getCid() === -1) {
$co->create([$this, 'onWebSocketOpen'], $config, $connection);
return;
}
// WebSocket 隐藏特性: _SERVER 全局变量会在 onWebSocketConnect 中被替换为当前连接的 Header 相关信息
try {
global $_SERVER;
Expand Down Expand Up @@ -91,6 +103,11 @@ public function onWebSocketOpen(array $config, TcpConnection $connection, $data)
*/
public function onWebSocketClose(array $config, TcpConnection $connection)
{
// 协程套娃
if (($co = Adaptive::getCoroutine()) !== null && $co->getCid() === -1) {
$co->create([$this, 'onWebSocketClose'], $config, $connection);
return;
}
if (($connection->worker instanceof Worker) && ($socket = WorkermanDriver::getInstance()->getWSServerSocketByWorker($connection->worker)) !== null) {
unset($socket->connections[$connection->id]);
} else {
Expand All @@ -110,6 +127,11 @@ public function onWebSocketClose(array $config, TcpConnection $connection)
*/
public function onWebSocketMessage(array $config, TcpConnection $connection, $data)
{
// 协程套娃
if (($co = Adaptive::getCoroutine()) !== null && $co->getCid() === -1) {
$co->create([$this, 'onWebSocketMessage'], $config, $connection, $data);
return;
}
try {
ob_logger()->debug('WebSocket message from: ' . $connection->id);
$frame = FrameFactory::createTextFrame($data);
Expand All @@ -132,6 +154,11 @@ public function onWebSocketMessage(array $config, TcpConnection $connection, $da

public function onHttpRequest(array $config, TcpConnection $connection, Request $request)
{
// 协程套娃
if (($co = Adaptive::getCoroutine()) !== null && $co->getCid() === -1) {
$co->create([$this, 'onHttpRequest'], $config, $connection, $request);
return;
}
$port = $connection->getLocalPort();
ob_logger()->debug('Http request from ' . $port . ': ' . $request->uri());
$event = new HttpRequestEvent(HttpFactory::createServerRequest(
Expand Down

0 comments on commit 22e4a71

Please sign in to comment.