基于 Swoole 开发实时在线聊天室(三):后台 WebSocket 服务器实现
今天我们继续聊天室项目后台功能的开发,现在,后端数据库已经就绪,基于 API Token 的用户认证功能已经实现,接下来,我们来实现聊天室功能的核心 —— WebSocket 服务器相关业务逻辑的实现。
创建 WebSocketHandler
首先我们在 app/Services
目录下创建用于处理 WebSocket 通信的 WebSocketHandler.php
,并初始化 WebSocketHandler
类代码如下:
<?php
namespace App\Services;
use Hhxsv5\LaravelS\Swoole\WebSocketHandlerInterface;
use Illuminate\Support\Facades\Log;
use Swoole\Http\Request;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
class WebSocketHandler implements WebSocketHandlerInterface
{
public function __construct()
{
// 构造函数即使为空,也不能省略
}
// 连接建立时触发
public function onOpen(Server $server, Request $request)
{
Log::info('WebSocket 连接建立:' . $request->fd);
}
// 收到消息时触发
public function onMessage(Server $server, Frame $frame)
{
// $frame->fd 是客户端 id,$frame->data 是客户端发送的数据
Log::info("从 {$frame->fd} 接收到的数据: {$frame->data}");
foreach ($server->connections as $fd) {
if (!$server->isEstablished($fd)) {
// 如果连接不可用则忽略
continue;
}
$server->push($fd, $frame->data); // 服务端通过 push 方法向所有客户端广播消息
}
}
// 连接关闭时触发
public function onClose(Server $server, $fd, $reactorId)
{
Log::info('WebSocket 连接关闭:' . $fd);
}
}
该类实现了 WebSocketHandlerInterface
接口,所以必须实现接口约定的构造函数、onOpen
、onMessage
以及 onClose
方法,每个方法的具体功能我们在前面在 Laravel 中集成 Swoole 实现 WebSocket 服务器已经介绍过,在建立连接和断开连接时,我们只是打印一条日志记录,在接收到来自客户端的消息时,我们会记录该消息并将其广播给每个与该 WebSocket 服务器连接的客户端(对应逻辑定义在 onMessage
方法中)。
异步事件监听与处理
当然,我们在构建聊天室项目时,实现的业务功能要比前面实现的弹幕功能要复杂,除了将消息广播给所有客户端之外,还要保存消息到数据库,而且还会校验用户是否登录,未登录用户不能发送消息,下面我们先来处理消息保存到数据库的实现。
由于操作数据库是一个涉及到网络 IO 的耗时操作,所以这里我们通过 Swoole 提供的异步事件监听机制将其转交给 Task Worker 去处理,从而提高 WebSocket 服务器的通信性能。
首先,借助 Laravel 的 Artisan 命令创建消息接收事件 MessageReceived
:
php artisan make:event MessageReceived
然后修改生成的 app/Events/MessageReceived.php
代码如下:
<?php
namespace App\Events;
use App\Message;
use Hhxsv5\LaravelS\Swoole\Task\Event;
class MessageReceived extends Event
{
private $message;
private $userId;
/**
* Create a new event instance.
*/
public function __construct($message, $userId = 0)
{
$this->message = $message;
$this->userId = $userId;
}
/**
* Get the message data
*
* return App\Message
*/
public function getData()
{
$model = new Message();
$model->room_id = $this->message->room_id;
$model->msg = $this->message->type == 'text' ? $this->message->content : '';
$model->img = $this->message->type == 'image' ? $this->message->image : '';
$model->user_id = $this->userId;
$model->created_at = Carbon::now();
return $model;
}
}
这个事件类只是对传入的数据进行格式转化,这里我们从外部传入消息对象和用户ID,然后通过 getData
方法将其组合为 Message
模型实例并返回。
由于 Message
模型只包含创建时间,不包含更新时间,所以我们显式指定了 created_at
字段,另外还要将 Message
模型类的 $timestamps
属性设置为 false
,以避免系统自动为其设置时间字段。
class Message extends Model
{
public $timestamps = false;
}
然后,创建消息监听器 MessageListener
对上述 MessageReceived
事件进行处理:
php artisan make:listener MessageListener
修改最新生成的 app/Listeners/MessageListener.php
文件代码如下:
<?php
namespace App\Listeners;
use App\Events\MessageReceived;
use Hhxsv5\LaravelS\Swoole\Task\Listener;
use Illuminate\Support\Facades\Log;
class MessageListener extends Listener
{
/**
* Create the event listener.
*
* @return void
*/
public function __construct()
{
//
}
/**
* Handle the event.
*
* @param MessageReceived $event
* @return void
*/
public function handle($event)
{
$message = $event->getData();
Log::info(__CLASS__ . ': 开始处理', $message->toArray());
if ($message && $message->user_id && $message->room_id && ($message->msg || $message->img)) {
$message->save();
Log::info(__CLASS__ . ': 处理完毕');
} else {
Log::error(__CLASS__ . ': 消息字段缺失,无法保存');
}
}
}
在消息监听器中,通过 handle
方法对事件进行处理,传入的 $event
参数对应上述 MessageReceived
对象实例,然后我们在该方法中对消息数据进行校验和保存,同时打印相应的日志信息。
用户认证校验和消息接收事件触发
有了消息接收事件和消息事件监听器后,接下来,我们需要在 WebSocket 服务器收到消息时触发消息接收事件,这个业务逻辑可以在 WebSocketHandler
的 onMessage
方法中完成,修改 onMessage
实现代码如下:
use App\Events\MessageReceived;
use Hhxsv5\LaravelS\Swoole\Task\Event;
use App\User;
// 收到消息时触发
public function onMessage(Server $server, Frame $frame)
{
// $frame->fd 是客户端 id,$frame->data 是客户端发送的数据
Log::info("从 {$frame->fd} 接收到的数据: {$frame->data}");
$message = json_decode($frame->data);
// 基于 Token 的用户认证校验
if (empty($message->token) || !($user = User::where('api_token', $message->token)->first())) {
Log::warning("用户" . $message->name . "已经离线,不能发送消息");
$server->push($frame->fd, "离线用户不能发送消息"); // 告知用户离线状态不能发送消息
} else {
// 触发消息接收事件
event = new MessageReceived($message, $user->id);
Event::fire($event);
unset($message->token); // 从消息中去掉当前用户令牌字段
foreach ($server->connections as $fd) {
if (!$server->isEstablished($fd)) {
// 如果连接不可用则忽略
continue;
}
$server->push($fd, json_encode($message)); // 服务端通过 push 方法向所有连接的客户端发送数据
}
}
}
在这里,我们首先对接收到的数据进行解码(假设客户端传递过来的是 JSON 字符串),然后判断其中是否包含 token
字段,以及 token
值是否有效,并以此为依据判断用户是否通过认证,对于没有认证的用户,不会广播消息给其他客户端,只是告知该用户需要登录才能发送消息。反之,如果用户已经登录,则触发 MessageReceived
事件,并传入消息对象和用户 ID,然后由消息监听器进行后续保存处理,而 WebSocket 服务器则遍历所有建立连接的有效客户端,并将去掉了 Token 字段的消息广播给它们,从而完成聊天消息的一次发送。
注:WebSocket 连接与之前认证使用的 HTTP 连接是不同的连接,所以认证逻辑也是独立的,不能简单通过
Auth
那种方式判断,那一套逻辑仅适用于 HTTP 通信。
用户认证逻辑调整
为此,我们还要调整默认的基于 Token 的用户认证逻辑,当用户注册成功或者登录成功,会更新 users
表的 api_token
字段值,当用户退出时,则清空该字段值,对应的实现代码定义在 app/Http/Controllers/AuthController.php
中:
<?php
namespace App\Http\Controllers;
use App\User;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Auth;
use Illuminate\Support\Str;
use Illuminate\Support\Facades\Hash;
use Illuminate\Support\Facades\Validator;
class AuthController extends Controller
{
public function __construct()
{
$this->middleware('auth:api')->only('logout');
}
public function register(Request $request)
{
// 验证注册字段
Validator::make($request->all(), [
'name' => ['required', 'string', 'max:255'],
'email' => ['required', 'string', 'email', 'max:255', 'unique:users'],
'password' => ['required', 'string', 'min:6']
])->validate();
// 在数据库中创建用户并返回
return User::create([
'name' => $request->input('name'),
'email' => $request->input('email'),
'password' => Hash::make($request->input('password')),
'api_token' => Str::random(60)
]);
}
public function login(Request $request)
{
// 验证登录字段
$request->validate([
'email' => 'required|string',
'password' => 'required|string',
]);
$email = $request->input('email');
$password = $request->input('password');
$user = User::where('email', $email)->first();
// 用户校验成功则返回 Token 信息
if ($user && Hash::check($password, $user->password)) {
$user->api_token = Str::random(60);
$user->save();
return response()->json(['user' => $user, 'success' => true]);
}
return response()->json(['success' => false]);
}
public function logout(Request $request)
{
$user = Auth::guard('auth:api')->user();
$userModel = User::find($user->id);
$userModel->api_token = null;
$userModel->save();
return response()->json(['success' => true]);
}
}
WebSocket 服务器及异步事件监听配置
最后我们来修改 config/laravels.php
配置文件,完成 WebSocket 服务器和异步事件监听配置。
首先通过配置 websocket
启动 WebSocket 并定义通信处理器:
'websocket' => [
'enable' => true,
'handler' => \App\Services\WebSocketHandler::class,
],
异步事件即对应监听器的映射关系在 events
配置项中配置,一个事件可以被多个监听器监听并处理:
'events' => [
\App\Events\MessageReceived::class => [
\App\Listeners\MessageListener::class,
]
],
另外,异步事件的监听和处理是通过 Swoole 的 Task Worker 进程处理的,所以还需要开启 task_worker_num
配置,这里我们使用默认配置即可:
'swoole' => [
...
'task_worker_num' => function_exists('swoole_cpu_num') ? swoole_cpu_num() * 2 : 8,
...
]
对于基于 Swoole HTTP 服务器运行的 Laravel 应用,由于 Laravel 容器会常驻内存,所以在涉及到用户认证的时候,需要在每次请求后清除本次请求的认证状态,以免被其他用户请求冒用,在配置文件 laravels.php
的 cleaners
配置项中取消如下这行配置前的注释即可:
'cleaners' => [
...
\Hhxsv5\LaravelS\Illuminate\Cleaners\AuthCleaner::class, // If you use the authentication or passport in your project, please uncomment this line
...
],
最后我们在 .env
新增如下这两行配置,分别用于指定 Swoole HTTP/WebSocket 服务器运行的 IP 地址和是否后台运行:
LARAVELS_LISTEN_IP=workspace
LARAVELS_DAEMONIZE=true
至此,Laravel 项目这块的后台 WebSocket 服务器编码和配置工作都已经完成了,下面我们要配置 Nginx 让 WebSocket 服务可以对外提供访问。
Nginx 虚拟主机配置
以 Laradock 工作台为例,在 Nginx 中新增虚拟主机 webchats.conf
(laradock/nginx/sites
目录下),用于配置对 Swoole HTTP 服务器和 WebSocket 服务器的支持(Swoole 的 WebSocket 服务器基于 HTTP 服务器,所以两者需要同时配置):
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream webchats {
# Connect IP:Port
server workspace:5200 weight=5 max_fails=3 fail_timeout=30s;
keepalive 16;
}
server {
listen 80;
server_name webchats.test;
root /var/www/webchat/public;
error_log /var/log/nginx/webchats_error.log;
access_log /var/log/nginx/webchats_access.log;
autoindex off;
index index.html index.htm;
# Nginx handles the static resources(recommend enabling gzip), LaravelS handles the dynamic resource.
location / {
try_files $uri @webchats;
}
# Response 404 directly when request the PHP file, to avoid exposing public/*.php
#location ~* \.php$ {
# return 404;
#}
# Http and WebSocket are concomitant, Nginx identifies them by "location"
# !!! The location of WebSocket is "/ws"
# Javascript: var ws = new WebSocket("ws://webchats.test/ws");
# 处理 WebSocket 通信
location ^~ /ws/ {
# proxy_connect_timeout 60s;
# proxy_send_timeout 60s;
# proxy_read_timeout: Nginx will close the connection if the proxied server does not send data to Nginx in 60 seconds; At the same time, this close behavior is also affected by heartbeat setting of Swoole.
# proxy_read_timeout 60s;
proxy_http_version 1.1;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Real-PORT $remote_port;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_set_header Scheme $scheme;
proxy_set_header Server-Protocol $server_protocol;
proxy_set_header Server-Name $server_name;
proxy_set_header Server-Addr $server_addr;
proxy_set_header Server-Port $server_port;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_pass http://webchats;
}
location @webchats {
# proxy_connect_timeout 60s;
# proxy_send_timeout 60s;
# proxy_read_timeout 60s;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Real-PORT $remote_port;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_set_header Scheme $scheme;
proxy_set_header Server-Protocol $server_protocol;
proxy_set_header Server-Name $server_name;
proxy_set_header Server-Addr $server_addr;
proxy_set_header Server-Port $server_port;
proxy_pass http://webchats;
}
}
在本地 hosts
配置文件中映射 webchats.test
域名:
127.0.0.1 webchats.test
接下来,重启 Nginx 容器:
docker-compose up -d nginx
重启 Swoole HTTP 服务器进行验证
在 laradock
目录下通过 docker exec -it laradock_workspace_1 bash
进入 workspace
容器,然后进入 webchat
目录,重启 Swoole HTTP/WebSocket 服务器:
php bin/laravels restart
这样一来,我们就可以通过 webchats.test
访问应用了:
访问成功,则表明基于 Swoole HTTP 服务器驱动的 Laravel 应用配置成功,但是有关本篇教程实现的 WebSocket 通信和用户认证功能并没有得到验证和测试,我们将在下篇教程开始前端页面组件的编写,并通过页面自动化测试来验证后端服务功能是否正常。
12 Comments
dingo和laravels影响,解决:Laravel-s Dingo (Lumen) 支持
这里改成点对点的话,即一对一,不是使用聊天室。fd是固定的?就是心跳的实现。心跳怎么实现?onClose关闭的时候fd不是要去掉吗?