在laravel下实现全双工的websocket开发
php-fpm下的laravel
因为php-fpm的请求阅后即焚的特性,在php-fpm容器下,是无法建立一个长连接到任何服务器。
所以在php-fpm容器下,只能实现服务器主动下发消息到客户端,而客户端无法通过websocket协议上报数据到服务端。
大概的思路如下图:

其中的gateway server服务器可以用workerman的gateway实现。http api server则是你的laravel代码。
octane下的laravel
octane引擎是laravel官方出的、运行laravel代码的容器包。
该包的特性是:解决了php-fpm的请求阅后即焚的特性,让一些变量得以驻留在内存中被重复利用。
该包的意义是:
- 官方亲自下场解决变量污染问题。在octane未出现之前,社区已经有很多基于swoole驱动laravel的包,这些三方包在解决常驻内存下框架的变量污染问题都上算不上尽善尽美,假设如果要动框架底层才能解决某一个变量的污染问题,这个时候第三方包就有点束手束脚了。
- 官方引导社区走向常驻内存的时代。自octane之后,所有给larvel贡献composer包的作者,都会或多或少的考虑其作品在octane下是否有变量污染的问题。
该包的弱点是:该包虽然可以使用swoole作为底层驱动,但是其禁用了协程模式,依然无法建立一个长连接到任何服务器。
workerman下的laravel
既然octane解决了常驻内存下变量污染问题,又不支持建立一个长连接到任何服务器。
那我就用workerman驱动octane,用workerman替换掉octane的swoole层。
如此一来,我既享受了octane解决常驻内存下变量污染的便利,又提供了建立一个长连接到任何服务器的能力。
代码如下:
<?php namespace App\Console\Commands; use DateTime; use Illuminate\Console\Command; use Illuminate\Foundation\Application; use Illuminate\Http\Request as LaravelRequest; use Laravel\Octane\ApplicationFactory; use Laravel\Octane\Octane; use Laravel\Octane\OctaneResponse; use Laravel\Octane\RequestContext; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\HttpFoundation\BinaryFileResponse; use Throwable; use Workerman\Connection\TcpConnection; use Workerman\Protocols\Http\Request as WorkermanRequest; use Workerman\Protocols\Http\Response as WorkermanResponse; use Workerman\Worker as WorkermanWorker; use Laravel\Octane\Worker as OctaneWorker; use Laravel\Octane\Contracts\Client as OctaneClient; use Illuminate\Contracts\Events\Dispatcher as EventDispatcher; /** * 用workerman实现一个简单的laravel server * composer require workerman/workerman * composer require laravel/octane * php artisan octane:install */ class LaraManCommand extends Command { /** * 服务器启动之前调度的事件 * 自定义进程可以监听该事件进行启动 */ public const EVENT_LARAMAN_STARTING = 'EVENT_LARAMAN_STARTING'; protected WorkermanWorker|null $workermanWorker; protected OctaneWorker|null $octaneWorker; protected OctaneClient|null $octaneClient; protected function configure(): void { $this->setName('laraman'); $this->setDescription('Manage Server'); $this->addArgument('action', InputArgument::REQUIRED, 'start|stop|status|connections'); $this->addOption('host', null, InputOption::VALUE_OPTIONAL, 'The IP address the server should bind to'); $this->addOption('port', null, InputOption::VALUE_OPTIONAL, 'The port the server should be available on [default: "8000"]'); $this->addOption('workers', null, InputOption::VALUE_OPTIONAL, 'The number of workers that should be available to handle requests'); $this->addOption('d', null, InputOption::VALUE_NONE, 'Run in the DAEMON mode'); $this->addOption('g', null, InputOption::VALUE_NONE, 'Gracefully stop'); } public function __construct() { parent::__construct(); WorkermanWorker::$pidFile = storage_path('laraman.pid'); WorkermanWorker::$logFile = storage_path('logs/laraman.log'); } /** * Execute the console command. */ public function handle(): void { $action = $this->argument('action'); if ($action === 'start') { $this->start(); } elseif ($action === 'stop') { $this->stop(); } elseif ($action === 'status') { $this->status(); } elseif ($action === 'connections') { $this->connections(); } else { $this->error('Invalid action'); } } protected function start(): void { $host = $this->option('host') ?? '127.0.0.1'; $port = $this->option('port') ?? '8000'; $workers = $this->option('workers') ?? 'auto'; $socket = "http://$host:$port"; WorkermanWorker::$daemonize = (bool)$this->option('d'); $this->workermanWorker = new WorkermanWorker($socket); $this->workermanWorker->name = $this->getName(); $this->workermanWorker->count = $workers === 'auto' ? $this->getCpuNum() * 2 : (int)$workers; $_SERVER['VAR_DUMPER_FORMAT'] = WorkermanWorker::$daemonize ? 'server' : 'cli'; $this->workermanWorker->onWorkerStart = function () { $this->octaneClient = $this->getOctaneClient(); $this->octaneWorker = new OctaneWorker( new ApplicationFactory(base_path()), $this->octaneClient ); $this->octaneWorker->boot(); }; $this->workermanWorker->onMessage = function (TcpConnection $connection, WorkermanRequest $request) { $this->octaneWorker->handle(...$this->octaneClient->marshalRequest(new RequestContext([ 'request' => $request, 'tcpConnection' => $connection, ]))); }; $this->info("Starting server on $socket"); app(EventDispatcher::class)?->dispatch(self::EVENT_LARAMAN_STARTING, self::EVENT_LARAMAN_STARTING); WorkermanWorker::runAll(); } protected function stop(): void { WorkermanWorker::runAll(); } protected function status(): void { WorkermanWorker::runAll(); } protected function connections(): void { WorkermanWorker::runAll(); } /** * 获取cpu核心数 * @return int */ protected function getCpuNum(): int { if (DIRECTORY_SEPARATOR === '\\') { return 1; } $count = 4; if (is_callable('shell_exec')) { if (strtolower(PHP_OS) === 'darwin') { $count = (int)shell_exec('sysctl -n machdep.cpu.core_count'); } else { $count = (int)shell_exec('nproc'); } } return $count > 0 ? $count : 4; } /** * 获取OctaneClient * @return OctaneClient */ protected function getOctaneClient(): OctaneClient { return new class implements OctaneClient { protected const STATUS_CODE_REASONS = [ 419 => 'Page Expired', 425 => 'Too Early', 431 => 'Request Header Fields Too Large', // RFC6585 451 => 'Unavailable For Legal Reasons', // RFC7725 ]; public function marshalRequest(RequestContext $context): array { /** * @var TcpConnection $tcpConnection */ $tcpConnection = $context['tcpConnection']; /** * @var WorkermanRequest $workermanRequest */ $workermanRequest = $context['request']; $server = []; foreach ($workermanRequest->header() as $key => $value) { if ($key === 'cookie') { continue; } $server['HTTP_' . strtoupper(str_replace('-', '_', $key))] = $value; } $server['REQUEST_METHOD'] = $workermanRequest->method(); $server['PATH_INFO'] = $workermanRequest->path(); $server['REQUEST_URI'] = $workermanRequest->uri(); $server['QUERY_STRING'] = $workermanRequest->queryString(); $server['MASTER_TIME'] = $server['REQUEST_TIME'] = time(); $server['REQUEST_TIME_FLOAT'] = microtime(true); $server['SERVER_PROTOCOL'] = 'HTTP/' . $workermanRequest->protocolVersion(); $server['SERVER_PORT'] = $tcpConnection->getLocalPort(); $server['REMOTE_PORT'] = $tcpConnection->getRemotePort(); $server['REMOTE_ADDR'] = $tcpConnection->getRemoteIp(); $laravelRequest = new LaravelRequest( $workermanRequest->get() ?? [], $workermanRequest->post() ?? [], [], $workermanRequest->cookie() ?? [], $workermanRequest->file() ?? [], $server, $workermanRequest->rawBody() ); return [ $laravelRequest, $context, ]; } public function respond(RequestContext $context, OctaneResponse $response): void { /** * @var TcpConnection $tcpConnection */ $tcpConnection = $context['tcpConnection']; /** * @var WorkermanRequest $workermanRequest */ $workermanRequest = $context['request']; $symfonyResponse = $response->response; $workermanResponse = new WorkermanResponse(); $workermanResponse->withStatus($symfonyResponse->getStatusCode(), self::STATUS_CODE_REASONS[$symfonyResponse->getStatusCode()] ?? null); $workermanResponse->withProtocolVersion($symfonyResponse->getProtocolVersion()); if (!$symfonyResponse->headers->has('Date')) { $symfonyResponse->setDate(DateTime::createFromFormat('U', time())); } if ($symfonyResponse instanceof BinaryFileResponse) { $headers = $symfonyResponse->headers->allPreserveCase(); unset($headers['Content-Length'], $headers['Accept-Ranges'], $headers['Content-Range']); $workermanResponse->withHeaders($headers); $workermanResponse->withFile($symfonyResponse->getFile()->getPathname()); } else { $workermanResponse->withHeaders($symfonyResponse->headers->allPreserveCase()); $workermanResponse->withBody($symfonyResponse->getContent()); } $keepAlive = $workermanRequest->header('connection'); if ( ($keepAlive === null && $workermanRequest->protocolVersion() === '1.1') || $keepAlive === 'keep-alive' || $keepAlive === 'Keep-Alive' ) { $tcpConnection->send($workermanResponse); return; } $tcpConnection->close($workermanResponse); } public function error(Throwable $e, Application $app, LaravelRequest $request, RequestContext $context): void { $workermanResponse = new WorkermanResponse(); $workermanResponse->header('Status', '500 Internal Server Error'); $workermanResponse->header('Content-Type', 'text/plain'); $workermanResponse->withBody(Octane::formatExceptionForClient($e, $app->make('config')->get('app.debug'))); /** * @var TcpConnection $tcpConnection */ $tcpConnection = $context['tcpConnection']; $tcpConnection->close($workermanResponse); } }; } } 全双工的websocket开发
架构如下:
要实现上述架构,你可以选择workerman提供的gateway-worker长连接框架。
也可以选择我开发的https://github.com/buexplain/netsvr-business-serial。
以我的包为例子,结合上面的LaraManCommand类,你只需实现一个监听器即可,监听器代码如下:
ps:要将下面这个监听器跑通,请先仔细阅读我的包的readme文件:github.com/buexplain/netsvr-busine...
<?php namespace App\Listeners; use Exception; use Illuminate\Support\Facades\Log; use Laravel\Octane\Events\WorkerStarting; use Laravel\Octane\Events\WorkerStopping; use Netsvr\ConnClose; use Netsvr\ConnOpen; use Netsvr\Event; use Netsvr\Transfer; use NetsvrBusiness\Container; use NetsvrBusiness\Contract\EventInterface; use NetsvrBusiness\Contract\MainSocketManagerInterface; use NetsvrBusiness\Contract\TaskSocketMangerInterface; use NetsvrBusiness\MainSocketManager; use NetsvrBusiness\NetBus; use NetsvrBusiness\Socket; use NetsvrBusiness\TaskSocketManger; use NetsvrBusiness\Workerman\MainSocket; use NetsvrBusiness\Workerman\TaskSocket; use Psr\Container\ContainerInterface; class NetBusListener { /** * Create the event listener. */ public function __construct() { } /** * Handle the event. * @throws Exception */ public function handle(WorkerStarting|WorkerStopping|null $event): void { //初始化容器 /** * @var $container Container */ $container = Container::getInstance(); self::initTaskSocketMangerInterface($container); if ($event instanceof WorkerStarting) { self::initMainSocketManagerInterface($container); } else if ($event instanceof WorkerStopping) { //先关闭mainSocket $container->has(MainSocketManagerInterface::class) && $container->get(MainSocketManagerInterface::class)->close(); //再关闭taskSocket $container->has(TaskSocketMangerInterface::class) && $container->get(TaskSocketMangerInterface::class)->close(); } } /** * 初始化配置信息,配置信息最好放在框架规定的目录,我写在这里只是方便演示 * @return array */ protected static function getConfig(): array { return [ //如果一台网关服务机器承载不了业务的websocket连接数,可以再部署一台网关服务机器,这里支持配置多个网关服务,处理多个网关服务的websocket消息 'netsvr' => [ [ //netsvr网关的worker服务器监听的tcp地址 'workerAddr' => '127.0.0.1:6061', //该参数表示接下来,需要网关服务的worker服务器开启多少协程来处理mainSocket连接的请求 'processCmdGoroutineNum' => 25, //该参数表示接下来,需要网关服务的worker服务器转发如下事件给到business进程的mainSocket连接 'events' => Event::OnOpen | Event::OnClose | Event::OnMessage, ], ], //taskSocket的最大闲置时间,单位秒,建议比netsvr网关的worker服务器的ReadDeadline配置小3秒 'maxIdleTime' => 117, //socket读写网关数据的超时时间,单位秒 'sendReceiveTimeout' => 5, //连接到网关的超时时间,单位秒 'connectTimeout' => 5, //business进程向网关的worker服务器发送的心跳消息,这个字符串与网关的worker服务器的配置要一致,如果错误,网关的worker服务器是会强制关闭连接的 'workerHeartbeatMessage' => '~6YOt5rW35piO~', //维持心跳的间隔时间,单位毫秒 'heartbeatIntervalMillisecond' => 25 * 1000, ]; } /** * 初始化taskSocketManger * @param Container $container * @return void */ protected static function initTaskSocketMangerInterface(ContainerInterface $container): void { //这里只是绑定一个闭包,实际上并不会与netsvr网关进行连接,后续使用到了,才会进行连接 $container->bind(TaskSocketMangerInterface::class, function () { $taskSocketManger = new TaskSocketManger(); $logPrefix = sprintf('TaskSocket#%d', getmypid()); foreach (self::getConfig()['netsvr'] as $item) { //将网关的特定参数与公共参数进行合并,网关的特定参数覆盖公共参数 $item = array_merge(self::getConfig(), $item); //创建连接对象,并添加到管理器,如果不用这个对象,则不会与netsvr网关进行连接 $taskSocket = new TaskSocket( $logPrefix, Log::channel(), $item['workerAddr'], $item['sendReceiveTimeout'], $item['connectTimeout'], $item['maxIdleTime'], $item['workerHeartbeatMessage'], $item['heartbeatIntervalMillisecond'], ); $taskSocketManger->addSocket($taskSocket); } return $taskSocketManger; }); } /** * 初始化mainSocket,初始化成功后,会接收到来自netsvr网关转发过来的websocket事件 * @param Container $container * @return void * @throws Exception * @throws Exception */ public static function initMainSocketManagerInterface(ContainerInterface $container): void { $mainSocketManager = new MainSocketManager(); $logPrefix = sprintf('MainSocket#%d', getmypid()); $event = self::getEvent(); foreach (self::getConfig()['netsvr'] as $item) { //将网关的特定参数与公共参数进行合并,网关的特定参数覆盖公共参数 $item = array_merge(self::getConfig(), $item); //创建socket $socket = new Socket( $logPrefix, Log::channel(), $item['workerAddr'], $item['sendReceiveTimeout'], $item['connectTimeout']); //创建MainSocket连接 $mainSocket = new MainSocket( $logPrefix, Log::channel(), $event, $socket, $item['workerHeartbeatMessage'], $item['events'], $item['processCmdGoroutineNum'], $item['heartbeatIntervalMillisecond']); //添加到管理器 $mainSocketManager->addSocket($mainSocket); } //启动成功后,将mainSocketManager绑定到容器中,提供给NetBus类使用 if ($mainSocketManager->start()) { $container->bind(MainSocketManagerInterface::class, $mainSocketManager); } } /** * 获取事件对象,这个类应该创建一个文件,实现EventInterface接口,我写在这里是为了演示方便 * @return EventInterface */ protected static function getEvent(): EventInterface { return new class implements EventInterface { /** * 处理连接打开事件 * @param ConnOpen $connOpen * @return void */ public function onOpen(ConnOpen $connOpen): void { Log::channel()->info('onOpen ' . $connOpen->serializeToJsonString()); } /** * 处理消息事件 * @param Transfer $transfer * @return void */ public function onMessage(Transfer $transfer): void { //将消息转发给NetBus,NetBus会根据uniqId将消息转发给对应的客户端 NetBus::singleCast($transfer->getUniqId(), $transfer->getData()); } /** * 处理连接关闭事件 * @param ConnClose $connClose * @return void */ public function onClose(ConnClose $connClose): void { Log::channel()->info('onClose ' . $connClose->serializeToJsonString()); } }; } } 本作品采用《CC 协议》,转载必须注明作者和本文链接
关于 LearnKu
:+1:
优秀
比较好奇什么样的业务场景下需要双工通信
webman不香嘛? 常驻内存 fpm还是洗洗睡吧
sse 不是更好,服务器通知方式。apache、nginx 都支持的