基于 Swoole 开发实时在线聊天室(三):后台 WebSocket 服务器实现


今天我们继续聊天室项目后台功能的开发,现在,后端数据库已经就绪,基于 API Token 的用户认证功能已经实现,接下来,我们来实现聊天室功能的核心 —— WebSocket 服务器相关业务逻辑的实现。

创建 WebSocketHandler

首先我们在 app/Services 目录下创建用于处理 WebSocket 通信的 WebSocketHandler.php,并初始化 WebSocketHandler 类代码如下:

<?php
namespace App\Services;

use Hhxsv5\LaravelS\Swoole\WebSocketHandlerInterface;
use Illuminate\Support\Facades\Log;
use Swoole\Http\Request;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;

class WebSocketHandler implements WebSocketHandlerInterface
{
    public function __construct()
    {
        // 构造函数即使为空,也不能省略
    }

    // 连接建立时触发
    public function onOpen(Server $server, Request $request)
    {
        Log::info('WebSocket 连接建立:' . $request->fd);
    }

    // 收到消息时触发
    public function onMessage(Server $server, Frame $frame)
    {
        // $frame->fd 是客户端 id,$frame->data 是客户端发送的数据
        Log::info("从 {$frame->fd} 接收到的数据: {$frame->data}");
        foreach ($server->connections as $fd) {
            if (!$server->isEstablished($fd)) {
                // 如果连接不可用则忽略
                continue;
            }
            $server->push($fd, $frame->data); // 服务端通过 push 方法向所有客户端广播消息
        }
    }

    // 连接关闭时触发
    public function onClose(Server $server, $fd, $reactorId)
    {
        Log::info('WebSocket 连接关闭:' . $fd);
    }
}

该类实现了 WebSocketHandlerInterface 接口,所以必须实现接口约定的构造函数、onOpenonMessage 以及 onClose 方法,每个方法的具体功能我们在前面在 Laravel 中集成 Swoole 实现 WebSocket 服务器已经介绍过,在建立连接和断开连接时,我们只是打印一条日志记录,在接收到来自客户端的消息时,我们会记录该消息并将其广播给每个与该 WebSocket 服务器连接的客户端(对应逻辑定义在 onMessage 方法中)。

异步事件监听与处理

当然,我们在构建聊天室项目时,实现的业务功能要比前面实现的弹幕功能要复杂,除了将消息广播给所有客户端之外,还要保存消息到数据库,而且还会校验用户是否登录,未登录用户不能发送消息,下面我们先来处理消息保存到数据库的实现。

由于操作数据库是一个涉及到网络 IO 的耗时操作,所以这里我们通过 Swoole 提供的异步事件监听机制将其转交给 Task Worker 去处理,从而提高 WebSocket 服务器的通信性能。

首先,借助 Laravel 的 Artisan 命令创建消息接收事件 MessageReceived

php artisan make:event MessageReceived

然后修改生成的 app/Events/MessageReceived.php 代码如下:

<?php
namespace App\Events;

use App\Message;
use Hhxsv5\LaravelS\Swoole\Task\Event;

class MessageReceived extends Event
{
    private $message;
    private $userId;

    /**
     * Create a new event instance.
     */
    public function __construct($message, $userId = 0)
    {
        $this->message = $message;
        $this->userId = $userId;
    }

    /**
     * Get the message data
     * 
     * return App\Message
     */
    public function getData()
    {
        $model = new Message();
        $model->room_id = $this->message->room_id;
        $model->msg = $this->message->type == 'text' ? $this->message->content : '';
        $model->img = $this->message->type == 'image' ? $this->message->image : '';
        $model->user_id = $this->userId;
        $model->created_at = Carbon::now();
        return $model;
    }
}

这个事件类只是对传入的数据进行格式转化,这里我们从外部传入消息对象和用户ID,然后通过 getData 方法将其组合为 Message 模型实例并返回。

由于 Message 模型只包含创建时间,不包含更新时间,所以我们显式指定了 created_at 字段,另外还要将 Message 模型类的 $timestamps 属性设置为 false,以避免系统自动为其设置时间字段。

class Message extends Model
{
    public $timestamps = false;
}

然后,创建消息监听器 MessageListener 对上述 MessageReceived 事件进行处理:

php artisan make:listener MessageListener

修改最新生成的 app/Listeners/MessageListener.php 文件代码如下:

<?php

namespace App\Listeners;

use App\Events\MessageReceived;
use Hhxsv5\LaravelS\Swoole\Task\Listener;
use Illuminate\Support\Facades\Log;

class MessageListener extends Listener
{
    /**
     * Create the event listener.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Handle the event.
     *
     * @param  MessageReceived  $event
     * @return void
     */
    public function handle($event)
    {
        $message = $event->getData();
        Log::info(__CLASS__ . ': 开始处理', $message->toArray());
        if ($message && $message->user_id && $message->room_id && ($message->msg || $message->img)) {
            $message->save();
            Log::info(__CLASS__ . ': 处理完毕');
        } else {
            Log::error(__CLASS__ . ': 消息字段缺失,无法保存');
        }
    }
}

在消息监听器中,通过 handle 方法对事件进行处理,传入的 $event 参数对应上述 MessageReceived 对象实例,然后我们在该方法中对消息数据进行校验和保存,同时打印相应的日志信息。

用户认证校验和消息接收事件触发

有了消息接收事件和消息事件监听器后,接下来,我们需要在 WebSocket 服务器收到消息时触发消息接收事件,这个业务逻辑可以在 WebSocketHandleronMessage 方法中完成,修改 onMessage 实现代码如下:

use App\Events\MessageReceived;
use Hhxsv5\LaravelS\Swoole\Task\Event; 
use App\User;

// 收到消息时触发
public function onMessage(Server $server, Frame $frame)
{
    // $frame->fd 是客户端 id,$frame->data 是客户端发送的数据
    Log::info("从 {$frame->fd} 接收到的数据: {$frame->data}");
    $message = json_decode($frame->data);
    // 基于 Token 的用户认证校验
    if (empty($message->token) || !($user = User::where('api_token', $message->token)->first())) {
        Log::warning("用户" . $message->name . "已经离线,不能发送消息");
        $server->push($frame->fd, "离线用户不能发送消息");  // 告知用户离线状态不能发送消息
    } else {
        // 触发消息接收事件
        event = new MessageReceived($message, $user->id);
        Event::fire($event);
        unset($message->token);  // 从消息中去掉当前用户令牌字段
        foreach ($server->connections as $fd) {
            if (!$server->isEstablished($fd)) {
                // 如果连接不可用则忽略
                continue;
            }
            $server->push($fd, json_encode($message)); // 服务端通过 push 方法向所有连接的客户端发送数据
        }
    }
}

在这里,我们首先对接收到的数据进行解码(假设客户端传递过来的是 JSON 字符串),然后判断其中是否包含 token 字段,以及 token 值是否有效,并以此为依据判断用户是否通过认证,对于没有认证的用户,不会广播消息给其他客户端,只是告知该用户需要登录才能发送消息。反之,如果用户已经登录,则触发 MessageReceived 事件,并传入消息对象和用户 ID,然后由消息监听器进行后续保存处理,而 WebSocket 服务器则遍历所有建立连接的有效客户端,并将去掉了 Token 字段的消息广播给它们,从而完成聊天消息的一次发送。

注:WebSocket 连接与之前认证使用的 HTTP 连接是不同的连接,所以认证逻辑也是独立的,不能简单通过 Auth 那种方式判断,那一套逻辑仅适用于 HTTP 通信。

用户认证逻辑调整

为此,我们还要调整默认的基于 Token 的用户认证逻辑,当用户注册成功或者登录成功,会更新 users 表的 api_token 字段值,当用户退出时,则清空该字段值,对应的实现代码定义在 app/Http/Controllers/AuthController.php 中:

<?php

namespace App\Http\Controllers;

use App\User;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Auth;
use Illuminate\Support\Str;
use Illuminate\Support\Facades\Hash;
use Illuminate\Support\Facades\Validator;

class AuthController extends Controller
{
    public function __construct()
    {
        $this->middleware('auth:api')->only('logout');
    }

    public function register(Request $request) 
    {
        // 验证注册字段
        Validator::make($request->all(), [
            'name' => ['required', 'string', 'max:255'],
            'email' => ['required', 'string', 'email', 'max:255', 'unique:users'],
            'password' => ['required', 'string', 'min:6']
        ])->validate();

        // 在数据库中创建用户并返回
        return User::create([
            'name' => $request->input('name'),
            'email' => $request->input('email'),
            'password' => Hash::make($request->input('password')),
            'api_token' => Str::random(60)
        ]);
    }

    public function login(Request $request) 
    {
        // 验证登录字段
        $request->validate([
            'email' => 'required|string',
            'password' => 'required|string',
        ]);

        $email = $request->input('email');
        $password = $request->input('password');
        $user = User::where('email', $email)->first();
        // 用户校验成功则返回 Token 信息
        if ($user && Hash::check($password, $user->password)) {
            $user->api_token = Str::random(60);
            $user->save();
            return response()->json(['user' => $user, 'success' => true]);
        }

        return  response()->json(['success' => false]);
    }

    public function logout(Request $request) 
    {
        $user = Auth::guard('auth:api')->user();
        $userModel = User::find($user->id);
        $userModel->api_token = null;
        $userModel->save();
        return response()->json(['success' => true]);
    }
}

WebSocket 服务器及异步事件监听配置

最后我们来修改 config/laravels.php 配置文件,完成 WebSocket 服务器和异步事件监听配置。

首先通过配置 websocket 启动 WebSocket 并定义通信处理器:

'websocket'                => [
    'enable' => true,
    'handler' => \App\Services\WebSocketHandler::class,
],

异步事件即对应监听器的映射关系在 events 配置项中配置,一个事件可以被多个监听器监听并处理:

'events'                   => [
    \App\Events\MessageReceived::class => [
        \App\Listeners\MessageListener::class,
    ]
],

另外,异步事件的监听和处理是通过 Swoole 的 Task Worker 进程处理的,所以还需要开启 task_worker_num 配置,这里我们使用默认配置即可:

'swoole'                   => [
    ...
    'task_worker_num'    => function_exists('swoole_cpu_num') ? swoole_cpu_num() * 2 : 8,
    ...
]

对于基于 Swoole HTTP 服务器运行的 Laravel 应用,由于 Laravel 容器会常驻内存,所以在涉及到用户认证的时候,需要在每次请求后清除本次请求的认证状态,以免被其他用户请求冒用,在配置文件 laravels.phpcleaners 配置项中取消如下这行配置前的注释即可:

'cleaners'                 => [
    ...
    \Hhxsv5\LaravelS\Illuminate\Cleaners\AuthCleaner::class,    // If you use the authentication or passport in your project, please uncomment this line
    ...
],

最后我们在 .env 新增如下这两行配置,分别用于指定 Swoole HTTP/WebSocket 服务器运行的 IP 地址和是否后台运行:

LARAVELS_LISTEN_IP=workspace
LARAVELS_DAEMONIZE=true

至此,Laravel 项目这块的后台 WebSocket 服务器编码和配置工作都已经完成了,下面我们要配置 Nginx 让 WebSocket 服务可以对外提供访问。

Nginx 虚拟主机配置

以 Laradock 工作台为例,在 Nginx 中新增虚拟主机 webchats.conflaradock/nginx/sites目录下),用于配置对 Swoole HTTP 服务器和 WebSocket 服务器的支持(Swoole 的 WebSocket 服务器基于 HTTP 服务器,所以两者需要同时配置):

map $http_upgrade $connection_upgrade {
    default upgrade;
    ''      close;
}

upstream webchats {
    # Connect IP:Port
    server workspace:5200 weight=5 max_fails=3 fail_timeout=30s;
    keepalive 16;
}

server {
    listen 80;

    server_name webchats.test;
    root /var/www/webchat/public;

    error_log /var/log/nginx/webchats_error.log;
    access_log /var/log/nginx/webchats_access.log;

    autoindex off;
    index index.html index.htm;

    # Nginx handles the static resources(recommend enabling gzip), LaravelS handles the dynamic resource.
    location / {
        try_files $uri @webchats;
    }

    # Response 404 directly when request the PHP file, to avoid exposing public/*.php
    #location ~* \.php$ {
    #    return 404;
    #}

    # Http and WebSocket are concomitant, Nginx identifies them by "location"
    # !!! The location of WebSocket is "/ws"
    # Javascript: var ws = new WebSocket("ws://webchats.test/ws");
    # 处理 WebSocket 通信
    location ^~ /ws/ {
        # proxy_connect_timeout 60s;
        # proxy_send_timeout 60s;
        # proxy_read_timeout: Nginx will close the connection if the proxied server does not send data to Nginx in 60 seconds; At the same time, this close behavior is also affected by heartbeat setting of Swoole.
        # proxy_read_timeout 60s;
        proxy_http_version 1.1;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Real-PORT $remote_port;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_set_header Scheme $scheme;
        proxy_set_header Server-Protocol $server_protocol;
        proxy_set_header Server-Name $server_name;
        proxy_set_header Server-Addr $server_addr;
        proxy_set_header Server-Port $server_port;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
        proxy_pass http://webchats;
    }

    location @webchats {
        # proxy_connect_timeout 60s;
        # proxy_send_timeout 60s;
        # proxy_read_timeout 60s;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Real-PORT $remote_port;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_set_header Scheme $scheme;
        proxy_set_header Server-Protocol $server_protocol;
        proxy_set_header Server-Name $server_name;
        proxy_set_header Server-Addr $server_addr;
        proxy_set_header Server-Port $server_port;
        proxy_pass http://webchats;
    }
}

在本地 hosts 配置文件中映射 webchats.test 域名:

127.0.0.1 webchats.test 

接下来,重启 Nginx 容器:

docker-compose up -d nginx

重启 Swoole HTTP 服务器进行验证

laradock 目录下通过 docker exec -it laradock_workspace_1 bash 进入 workspace 容器,然后进入 webchat 目录,重启 Swoole HTTP/WebSocket 服务器:

php bin/laravels restart

这样一来,我们就可以通过 webchats.test 访问应用了:

访问成功,则表明基于 Swoole HTTP 服务器驱动的 Laravel 应用配置成功,但是有关本篇教程实现的 WebSocket 通信和用户认证功能并没有得到验证和测试,我们将在下篇教程开始前端页面组件的编写,并通过页面自动化测试来验证后端服务功能是否正常。


点赞 取消点赞 收藏 取消收藏

<< 上一篇: 基于 Swoole 开发实时在线聊天室(二):后台数据库准备和 API 认证功能实现

>> 下一篇: 基于 Swoole 开发实时在线聊天室(四):前端资源初始化