基于 Swoole 开发实时在线聊天室(六):建立 socket.io 客户端与 Swoole Websocket 服务器的连接
服务端改造方案
完成开发环境、后端 Websocket 服务器的搭建以及前端资源的初始化后,接下来,我们正式开始调试前后端接口完成在线聊天室功能的开发。
首先我们要做的是建立客户端与服务端的 Websocket 连接和通信,这里,我们的 Websocket 客户端使用的是 socket.io-client,服务端使用的是基于 Swoole 的 LaravelS 扩展包提供的 WebSocket 服务器,由于 socket.io 有一套自己的连接建立和数据编码机制,所以必须要对原来的 Websocket 服务器实现做调整,否则无法建立 WebSocket 连接。
LaravelS 扩展包对 Socket.io 客户端的支持并不友好,不过另一个流行的 Laravel Swoole 扩展包 Laravel-Swoole则对其有很好的支持,甚至可以说就是对 socket.io 客户端的 PHP 服务端适配,具体可以参考其官方文档说明,所以很自然的,我们可以将它的这部分实现移植到 LaravelS 中来。
本项目代码已提交到 Github 代码仓库:https://github.com/nonfu/webchat,你可以从这里下载代码进行比对查看。
编写数据解析器 Parser
socket.io 客户端发送接收的数据格式有自己的规则要求,我们需要按照这个规则在服务端实现对应的数据解析器,以便在处理数据前先对接收到的客户端数据进行解码,然后在处理之后,将数据发送给客户端之前对其进行编码,以便客户端可以正确解析。
这部分逻辑直接从 Laravel-Swoole 扩展包照搬,首先在 app/Services
目录下创建 WebSocket
子目录用于存放 WebSocket 服务器相关代码,然后把之前创建的 WebSocketHandler
处理器类搬进来,同时不要忘了修改 config/laravels.php
配置文件中的 WebSocketHandler
路径:
'websocket' => [
'enable' => true,
'handler' => \App\Services\WebSocket\WebSocketHandler::class,
],
接下来,开始编写数据解析器,在 app/Services/WebSocket
目录下创建一个抽象基类 Parser
,初始化代码如下:
<?php
/**
* 数据编码解码器抽象基类
*/
namespace App\Services\WebSocket;
use Illuminate\Support\Facades\App;
abstract class Parser
{
/**
* Strategy classes need to implement handle method.
*/
protected $strategies = [];
/**
* Execute strategies before decoding payload.
* If return value is true will skip decoding.
*
* @param \Swoole\WebSocket\Server $server
* @param \Swoole\WebSocket\Frame $frame
*
* @return boolean
*/
public function execute($server, $frame)
{
$skip = false;
foreach ($this->strategies as $strategy) {
$result = App::call(
$strategy . '@handle',
[
'server' => $server,
'frame' => $frame,
]
);
if ($result === true) {
$skip = true;
break;
}
}
return $skip;
}
/**
* Encode output payload for websocket push.
*
* @param string $event
* @param mixed $data
*
* @return mixed
*/
abstract public function encode(string $event, $data);
/**
* Input message on websocket connected.
* Define and return event name and payload data here.
*
* @param \Swoole\Websocket\Frame $frame
*
* @return array
*/
abstract public function decode($frame);
}
然后在 WebSocket
目录下创建子目录 SocketIO
用于存放与 socket.io 客户端交互的相关代码。首先是 socket.io 客户端对应的数据解析器 SocketIOParser
,该类继承自 Parser
:
<?php
/**
* Socket.io 对应数据编解码器
*/
namespace App\Services\WebSocket\SocketIO;
use App\Services\WebSocket\Parser;
use App\Services\WebSocket\SocketIO\Strategies\HeartbeatStrategy;
class SocketIOParser extends Parser
{
/**
* Strategy classes need to implement handle method.
*/
protected $strategies = [
HeartbeatStrategy::class,
];
/**
* Encode output payload for websocket push.
*
* @param string $event
* @param mixed $data
*
* @return mixed
*/
public function encode(string $event, $data)
{
$packet = Packet::MESSAGE . Packet::EVENT;
$shouldEncode = is_array($data) || is_object($data);
$data = $shouldEncode ? json_encode($data) : $data;
$format = $shouldEncode ? '["%s",%s]' : '["%s","%s"]';
return $packet . sprintf($format, $event, $data);
}
/**
* Decode message from websocket client.
* Define and return payload here.
*
* @param \Swoole\Websocket\Frame $frame
*
* @return array
*/
public function decode($frame)
{
$payload = Packet::getPayload($frame->data);
return [
'event' => $payload['event'] ?? null,
'data' => $payload['data'] ?? null,
];
}
}
这里面用到了封装的 Packet
类对通信数据进行解析处理:
<?php
/**
* Socket.io 通信数据解析底层类
*/
namespace App\Services\WebSocket\SocketIO;
class Packet
{
/**
* Socket.io packet type `open`.
*/
const OPEN = 0;
/**
* Socket.io packet type `close`.
*/
const CLOSE = 1;
/**
* Socket.io packet type `ping`.
*/
const PING = 2;
/**
* Socket.io packet type `pong`.
*/
const PONG = 3;
/**
* Socket.io packet type `message`.
*/
const MESSAGE = 4;
/**
* Socket.io packet type 'upgrade'
*/
const UPGRADE = 5;
/**
* Socket.io packet type `noop`.
*/
const NOOP = 6;
/**
* Engine.io packet type `connect`.
*/
const CONNECT = 0;
/**
* Engine.io packet type `disconnect`.
*/
const DISCONNECT = 1;
/**
* Engine.io packet type `event`.
*/
const EVENT = 2;
/**
* Engine.io packet type `ack`.
*/
const ACK = 3;
/**
* Engine.io packet type `error`.
*/
const ERROR = 4;
/**
* Engine.io packet type 'binary event'
*/
const BINARY_EVENT = 5;
/**
* Engine.io packet type `binary ack`. For acks with binary arguments.
*/
const BINARY_ACK = 6;
/**
* Socket.io packet types.
*/
public static $socketTypes = [
0 => 'OPEN',
1 => 'CLOSE',
2 => 'PING',
3 => 'PONG',
4 => 'MESSAGE',
5 => 'UPGRADE',
6 => 'NOOP',
];
/**
* Engine.io packet types.
*/
public static $engineTypes = [
0 => 'CONNECT',
1 => 'DISCONNECT',
2 => 'EVENT',
3 => 'ACK',
4 => 'ERROR',
5 => 'BINARY_EVENT',
6 => 'BINARY_ACK',
];
/**
* Get socket packet type of a raw payload.
*
* @param string $packet
*
* @return int|null
*/
public static function getSocketType(string $packet)
{
$type = $packet[0] ?? null;
if (! array_key_exists($type, static::$socketTypes)) {
return null;
}
return (int) $type;
}
/**
* Get data packet from a raw payload.
*
* @param string $packet
*
* @return array|null
*/
public static function getPayload(string $packet)
{
$packet = trim($packet);
$start = strpos($packet, '[');
if ($start === false || substr($packet, -1) !== ']') {
return null;
}
$data = substr($packet, $start, strlen($packet) - $start);
$data = json_decode($data, true);
if (is_null($data)) {
return null;
}
return [
'event' => $data[0],
'data' => $data[1] ?? null,
];
}
/**
* Return if a socket packet belongs to specific type.
*
* @param $packet
* @param string $typeName
*
* @return bool
*/
public static function isSocketType($packet, string $typeName)
{
$type = array_search(strtoupper($typeName), static::$socketTypes);
if ($type === false) {
return false;
}
return static::getSocketType($packet) === $type;
}
}
此外在 SocketIOParser
中还引入了对心跳连接进行处理的策略,所谓心跳连接指的是为了保持长连接而每隔一定时间进行通信的连接,通常这些通信不需要被处理可以被忽略掉,而这里也是这么做的。
心跳连接策略类保存在 SocketIO/Strategies
目录下:
<?php
/**
* 心跳连接处理策略类
*/
namespace App\Services\WebSocket\SocketIO\Strategies;
use App\Services\WebSocket\SocketIO\Packet;
class HeartbeatStrategy
{
/**
* If return value is true will skip decoding.
*
* @param \Swoole\WebSocket\Server $server
* @param \Swoole\WebSocket\Frame $frame
*
* @return boolean
*/
public function handle($server, $frame)
{
$packet = $frame->data;
$packetLength = strlen($packet);
$payload = '';
if (Packet::getPayload($packet)) {
return false;
}
if ($isPing = Packet::isSocketType($packet, 'ping')) {
$payload .= Packet::PONG;
}
if ($isPing && $packetLength > 1) {
$payload .= substr($packet, 1, $packetLength - 1);
}
if ($isPing) {
$server->push($frame->fd, $payload);
}
return true;
}
}
至此,我们的通信数据解析器就全部完成了,接下来,我们来看一下封装的通信数据发送类。
编写数据发送类 Pusher
重构后的 WebSocketHandler
类将只承担路由和控制器功能,涉及业务逻辑的相关服务都会被剥离到独立的业务单元中完成,包括数据发送,因为我们需要对其进行统一的封装处理,以便能够被客户端解析。在 app/Services/WebSocket
目录下创建 Pusher
类用于数据发送:
<?php
/**
* 通信数据发送类
*/
namespace App\Services\WebSocket;
class Pusher
{
/**
* @var \Swoole\Websocket\Server
*/
protected $server;
/**
* @var int
*/
protected $opcode;
/**
* @var int
*/
protected $sender;
/**
* @var array
*/
protected $descriptors;
/**
* @var bool
*/
protected $broadcast;
/**
* @var bool
*/
protected $assigned;
/**
* @var string
*/
protected $event;
/**
* @var mixed|null
*/
protected $message;
/**
* Push constructor.
*
* @param int $opcode
* @param int $sender
* @param array $descriptors
* @param bool $broadcast
* @param bool $assigned
* @param string $event
* @param mixed|null $message
* @param \Swoole\Websocket\Server
*/
protected function __construct(
int $opcode,
int $sender,
array $descriptors,
bool $broadcast,
bool $assigned,
string $event,
$message = null,
$server
)
{
$this->opcode = $opcode;
$this->sender = $sender;
$this->descriptors = $descriptors;
$this->broadcast = $broadcast;
$this->assigned = $assigned;
$this->event = $event;
$this->message = $message;
$this->server = $server;
}
/**
* Static constructor
*
* @param array $data
* @param \Swoole\Websocket\Server $server
*
* @return Pusher
*/
public static function make(array $data, $server)
{
return new static(
$data['opcode'] ?? 1,
$data['sender'] ?? 0,
$data['fds'] ?? [],
$data['broadcast'] ?? false,
$data['assigned'] ?? false,
$data['event'] ?? null,
$data['message'] ?? null,
$server
);
}
/**
* @return int
*/
public function getOpcode(): int
{
return $this->opcode;
}
/**
* @return int
*/
public function getSender(): int
{
return $this->sender;
}
/**
* @return array
*/
public function getDescriptors(): array
{
return $this->descriptors;
}
/**
* @param int $descriptor
*
* @return self
*/
public function addDescriptor($descriptor): self
{
return $this->addDescriptors([$descriptor]);
}
/**
* @param array $descriptors
*
* @return self
*/
public function addDescriptors(array $descriptors): self
{
$this->descriptors = array_values(
array_unique(
array_merge($this->descriptors, $descriptors)
)
);
return $this;
}
/**
* @param int $descriptor
*
* @return bool
*/
public function hasDescriptor(int $descriptor): bool
{
return in_array($descriptor, $this->descriptors);
}
/**
* @return bool
*/
public function isBroadcast(): bool
{
return $this->broadcast;
}
/**
* @return bool
*/
public function isAssigned(): bool
{
return $this->assigned;
}
/**
* @return string
*/
public function getEvent(): string
{
return $this->event;
}
/**
* @return mixed|null
*/
public function getMessage()
{
return $this->message;
}
/**
* @return \Swoole\Websocket\Server
*/
public function getServer()
{
return $this->server;
}
/**
* @return bool
*/
public function shouldBroadcast(): bool
{
return $this->broadcast && empty($this->descriptors) && ! $this->assigned;
}
/**
* Returns all descriptors that are websocket
*
* @param \Swoole\Connection\Iterator $descriptors
*
* @return array
*/
protected function getWebsocketConnections(): array
{
return array_filter(iterator_to_array($this->server->connections), function ($fd) {
return $this->server->isEstablished($fd);
});
}
/**
* @param int $fd
*
* @return bool
*/
public function shouldPushToDescriptor(int $fd): bool
{
if (! $this->server->isEstablished($fd)) {
return false;
}
return $this->broadcast ? $this->sender !== (int) $fd : true;
}
/**
* Push message to related descriptors
*
* @param mixed $payload
*
* @return void
*/
public function push($payload): void
{
// attach sender if not broadcast
if (! $this->broadcast && $this->sender && ! $this->hasDescriptor($this->sender)) {
$this->addDescriptor($this->sender);
}
// check if to broadcast to other clients
if ($this->shouldBroadcast()) {
$this->addDescriptors($this->getWebsocketConnections());
}
// push message to designated fds
foreach ($this->descriptors as $descriptor) {
if ($this->shouldPushToDescriptor($descriptor)) {
$this->server->push($descriptor, $payload, $this->opcode);
}
}
}
}
该类主要用于数据处理后发送给客户端的业务逻辑处理,包括数据解析和统一封装、是否广播等。
编写 WebSocket
服务类
除了简单的数据接收和发送之外,我们的在线聊天室还有很多其它复杂功能,所以有必要创建一个单独的服务类来实现这些功能,比如房间的加入和退出、用户的认证和获取、数据的发送和广播等,最终将会在这里调用 Pusher
类发送数据,可以说这个服务类是整个 WebSocket 后端服务的核心,不过本篇教程先简化了这个部分,只是拷贝过来一个空类占位,更多功能将在后续教程中逐步加进来:
<?php
namespace App\Services\WebSocket;
class WebSocket
{
const PUSH_ACTION = 'push';
const EVENT_CONNECT = 'connect';
const USER_PREFIX = 'uid_';
/**
* Determine if to broadcast.
*
* @var boolean
*/
protected $isBroadcast = false;
/**
* Scoket sender's fd.
*
* @var integer
*/
protected $sender;
/**
* Recepient's fd or room name.
*
* @var array
*/
protected $to = [];
/**
* Websocket event callbacks.
*
* @var array
*/
protected $callbacks = [];
}
在这篇教程中,为了简化流程,我们在 WebSocketHandler
中直接调用 Pusher
类发送数据到客户端,以便快速演示 WebSocket 通信连接的建立。
重写 WebSocketHandler
处理器实现
最后,我们按照新的代码结构重新实现 WebSocketHandler
处理器的 onOpen
和 onMessage
方法:
<?php
/**
* WebSocket 服务通信处理器类
* Author:学院君
*/
namespace App\Services\WebSocket;
use App\Services\WebSocket\SocketIO\SocketIOParser;
use Hhxsv5\LaravelS\Swoole\WebSocketHandlerInterface;
use Illuminate\Support\Facades\Log;
use Swoole\Http\Request;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
class WebSocketHandler implements WebSocketHandlerInterface
{
/**
* @var WebSocket
*/
protected $websocket;
/**
* @var Parser
*/
protected $parser;
public function __construct()
{
$this->websocket = app(WebSocket::class);
$this->parser = app(SocketIOParser::class);
}
// 连接建立时触发
public function onOpen(Server $server, Request $request)
{
if (!request()->input('sid')) {
// 初始化连接信息适配 socket.io-client,这段代码不能省略,否则无法建立连接
$payload = json_encode([
'sid' => base64_encode(uniqid()),
'upgrades' => [],
'pingInterval' => config('laravels.swoole.heartbeat_idle_time') * 1000,
'pingTimeout' => config('laravels.swoole.heartbeat_check_interval') * 1000,
]);
$initPayload = Packet::OPEN . $payload;
$connectPayload = Packet::MESSAGE . Packet::CONNECT;
$server->push($request->fd, $initPayload);
$server->push($request->fd, $connectPayload);
}
Log::info('WebSocket 连接建立:' . $request->fd);
$payload = [
'sender' => $request->fd,
'fds' => [$request->fd],
'broadcast' => false,
'assigned' => false,
'event' => 'message',
'message' => '欢迎访问聊天室',
];
$pusher = Pusher::make($payload, $server);
$pusher->push($this->parser->encode($pusher->getEvent(), $pusher->getMessage()));
}
// 收到消息时触发
public function onMessage(Server $server, Frame $frame)
{
// $frame->fd 是客户端 id,$frame->data 是客户端发送的数据
Log::info("从 {$frame->fd} 接收到的数据: {$frame->data}");
if ($this->parser->execute($server, $frame)) {
// 跳过心跳连接处理
return;
}
$payload = $this->parser->decode($frame);
['event' => $event, 'data' => $data] = $payload;
$payload = [
'sender' => $frame->fd,
'fds' => [$frame->fd],
'broadcast' => false,
'assigned' => false,
'event' => $event,
'message' => $data,
];
$pusher = Pusher::make($payload, $server);
$pusher->push($this->parser->encode($pusher->getEvent(), $pusher->getMessage()));
}
// 连接关闭时触发
public function onClose(Server $server, $fd, $reactorId)
{
Log::info('WebSocket 连接关闭:' . $fd);
}
}
首先在构造函数中初始化 $websocket
和 $parser
属性,然后在连接建立回调方法 onOpen
中,判断请求数据中是否包含 sid
字段,没有包含需要将连接初始化信息发送给客户端,以便成功建立 WebSocket 连接,这里我们还指定了心跳间隔时间和超时时间,相应的,我们还需要在配置文件 config/laravels.php
的 swoole
配置项中新增如下两个配置:
'heartbeat_idle_time' => 600,
'heartbeat_check_interval' => 60,
接下来调用 Pusher
类的 push
方法发送经过编码的欢迎信息以便被 socket.io 客户端解析。
在收到消息的回调方法 onMessage
中,首先调用 Parser
类的 execute
方法判断是否是心跳连接,如果是心跳连接的话跳过不做处理,否则的话将收到的信息进行解码,经过简单处理后,再经由 Pusher
类的 push
方法发送回给客户端。
至此,适配 socket.io 客户端的 Swoole WebSocket 服务端连接建立和简单通信逻辑已经初步实现了,接下来,我们需要重启下 Swoole WebSocket 服务器让代码生效:
bin/laravels restart
socket.io 客户端代码调整
最后,我们修改下 socket.io 客户端的连接代码,打开 resources/js/socket.js
修改连接建立代码如下:
// 通过 socket.io 客户端进行 WebSocket 通信
import io from 'socket.io-client';
const socket = io('http://webchats.test', {
path: '/ws',
transports: ['websocket']
});
export default socket;
这里我们设置服务器地址为基于 Swoole HTTP 服务器驱动的 http://webchats.test
,然后将路径设置为 /ws
,以便连接到 Swoole WebSocket 服务器,最后设置传输层协议为 websocket
,取代默认的长轮询(polling
)机制。
另外前端 Vue 组件和视图文件以及主入口 JavaScript 文件 app.js
也做了一些细微调整,对应代码以代码仓库 https://github.com/nonfu/webchat 中的最新提交版本为准,比如在 app.js
中,我们打印了 Socket 连接是否建立的日志:
socket.on('connect', async () => {
console.log('websocket connected: ' + socket.connected);
...
接下来,我们运行 npm run dev
重新编译资源,在浏览器中访问 http://webchats.test
,通过 F12 就可以看到 WebSocket 连接建立和通信数据了:
在控制台 Console
标签页也可以看到如下日志:
websocket connected: true
表明连接建立成功。
本项目代码已提交到 Github 代码仓库:https://github.com/nonfu/webchat。
21 Comments
很好奇 这里的客户端不能直接用websocket的客户端吗
可以 用 socket.io 懒得自己封装了
还有一个问题需要请教一下,比如说我现在是使用passport做api的认证。在登录成功之后转为websocket ,使用vue-cookie 保存状态 如果使用redis 把fd 和 user_id 做双向绑定,刷新一次页面意味着重新绑定,是否意味着每次初始化连接websocket 的使用 我都需要去验证 客户端的 access_token?
那肯定是的 http请求不也是一样吗 因为请求本身都是无状态的 所以才要借助 cookie、token 这些请求字段去识别
关心: on.connect 和 on.disconnect 就可以了。当然,前提是授权通过后。这个和 socket.io 又没有必然的联系。其他楼主把中间件这块处理好,就方便多了。
login
的逻辑放在connect
的时候不更好么。以客户端通过接口调用登录,服务端用jwt
实现为例。客户端(js):
服务端:
App\Services\WebSocket\WebSocketHandler
连接成功后,如果客户端有需要进房间的必要,再
joinRoom
。感谢建议 这个思路不错
这样还有一个问题。虽然让用户最终是没连上。但是 socket.io 自动重连的机制会让它一直连一直断一直连一直断。这个还需要额外处理。
学院君 你好! App\Services\WebSocket\WebSocketHandler类有笔误 (use App\Services\WebSocket\SocketIO\SocketIOParser这个类重复了)
感谢反馈 已修正