基于 Redis 发布订阅 + Socket.io 实现事件消息广播功能


广播系统概述

前面学院君给大家介绍了 Laravel 底层基于 Redis 列表驱动的消息队列实现原理,以及基于消息队列的事件监听和和处理,今天我们继续来看 Laravel 中另一个可以使用消息队列的场景 —— 事件广播,此外,我们还可以结合 Redis 发布/订阅功能完成广播系统的 Websocket 服务端实现。

所谓广播,其实就是基于 Websocket 协议实现的客户端与服务端双全工通信,不同于传统 HTTP 协议那种被动应答式通信,服务端只有在客户端发起请求才能返回响应数据,在 Websocket 协议中,客户端依然可以发送请求到服务端,服务端也可以主动发送数据到客户端,而无需客户端发起请求,并且支持同时向多个客户端发送数据,就像「广播」一样 —— 大喇叭一喊,所有人都接收到消息了。

显然,通过广播功能可以轻松构建类似在线聊天室、股票行情之类的实时消息系统,往小一点说,也可以用于实时给用户发送提醒消息,无需用户刷新页面发送请求。

关于 Websocket 协议的更多细节以及和 HTTP 协议 之间的区别和联系,可以参考网络协议系列里面从 Ajax 到 WebSocket 这篇教程。

Laravel 自带的广播组件 Websocket 服务端默认是基于 Pusher 提供的,这是一个国外的付费第三方实时消息服务,不适合国内使用,并且学院君这里重点介绍的是基于 Redis 的服务端实现,所以我们将通过官方提供的另一种广播服务端免费解决方案 —— Redis + Socket.io 实现。

广播系统实现流程

在深入探究 Laravel 广播组件功能和底层实现源码之前,我们先通过原生代码实现一个简易版的广播系统,以方便大家更好地了解广播组件的基本原理。在正式开始构建之前,学院君先列出基本实现流程如下:

  1. 在 Laravel 服务端通过 Redis 主动发布消息;
  2. 在 Websocket 服务器(基于 Socket.io 实现)里通过 Redis 订阅功能接收服务端 Redis 发布的消息,再将其广播到所有与之建立连接的 Websocket 客户端(基于 Socket.io 提供的 API 方法);
  3. 在 Websocket 客户端(基于 Socket.io 实现)里监听并接收服务端广播的消息进行处理。

接下来,我们就按照这个流程来实现广播功能。

通过 Redis 发布事件消息

开始之前,假设你已经启动了 Redis 服务器,安装了 PHP Redis 扩展,并配置好了 Laravel 项目的 Redis 连接。

首先我们创建一个 Artisan 命令类:

php artisan make:command RedisPublish

用于通过 Redis 的 PUBLISH 指令在 test-channel 频道发布一条用户注册事件消息,为了简化演示,这里使用一个 PHP 数组模拟事件消息:

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;

class RedisPublish extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'redis:publish';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Redis Publish Message';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return int
     */
    public function handle()
    {
        $data = [
            'event' => 'UserSignedUp',
            'data' => [
                'username' => '学院君'
            ]
        ];
        Redis::publish('test-channel', json_encode($data));
    }
}

编写 Websocket 服务端实现代码

对于 Redis 驱动的广播系统,由于 Redis 本身并不能提供完整的 Websocket 服务器实现,所以需要借助其他的 Websocket 服务端实现做补充,这里我们选择 Socket.io,它是一个 JavaScript 实现的、基于 Websocket 协议的开源 Client-Server 实时通信库,既提供了服务端实现,也提供了客户端实现。

要构建 Websocket 服务端,需要先安装 socket.io 服务端依赖,同时还要引入 ioredis 依赖以便通过 Redis 订阅 Laravel 服务端基于 Redis 发布的事件消息,Redis 的发布/订阅功能正是用于这里,可以看到这是一个异构系统,Redis 发布位于 Laravel 应用,Redis 订阅位于 JavaScript 应用,以及 http 依赖用于启动 HTTP 服务器(Websocket 通信需要建立在 HTTP 通信之上):

npm install --save socket.io ioredis http

在项目根目录下创建 websocket.js,编写简单的 Websocket 服务器实现代码如下:

var server = require('http').Server();
var io = require('socket.io')(server);

var Redis = require('ioredis');
var redis = new Redis({
    host: 'redis',
    port: 6379
});

redis.subscribe('laravel_database_test-channel');
redis.on('message', function (channel, message) {
    console.log(channel, message);
    message = JSON.parse(message);
    io.emit(channel + ":" + message.event, message.data);
});

server.listen(3000, () => {
    console.log('http server started and listen on 3000.');
});

在这段代码中,我们通过 HTTP 服务器监听客户端请求,并在此基础上进行 Websocket 握手和连接建立,然后将客户端 Redis 与服务端 Redis 建立连接并通过 SUBSCRIBE 指令订阅 laravel_database_test-channel 频道(laravel_database_ 是 Laravel Redis 数据库的默认前缀),一旦 Redis 服务端在这个频道发送了消息(比如执行了 redis:publish 命令),就能通过 redis.on 接收到事件消息数据,接着在闭包回调中通过 io.emit 按照 Socket.io 约定的格式进行处理后将其广播给 Socket.io 客户端。

为了方便大家了解底层执行原理,这里给出了原生代码的实现,你还可以使用社区提供的 Laravel Echo Server 作为服务端 Websocket 实现,其底层实现的基本原理和我们这里一样。

你可以通过如下命令启动这个 Websocket 服务器:

sail node websocket.js

-w686

学院君这里使用了 Laravel Sail 作为本地开发环境,对应其他环境,在相应环境中通过 node websocket.js 启动即可。还是以 Sail 为例,我们需要将应用容器中的 3000 端口暴露出来,才可以被客户端访问,在 .env 中新增一个 WEBSOCKET_PORT 配置:

WEBSOCKET_PORT=3000

然后在 docker-compose.yml 中为 redis.test 配置端口映射:

services:
    redis.test:
        ...
        ports:
            - '${APP_PORT:-80}:80'
            - '${WEBSOCKET_PORT}:3000'

将 Websocket 服务器的端口 3000 暴露出来以便被外部访问。重启 redis.test 容器让修改生效,并再次启动 Websocket 服务器:

sail down
sail up -d
sail node websocket.js

编写 Websocket 客户端实现代码

最后,为了广播系统链路的完整性,还要准备 Websocket 客户端以便接收服务端广播的消息并进行处理。

你当然可以使用 Laravel 官方提供的 Laravel Echo 库作为 Websocket 客户端,不过为了和 Websocket 服务端匹配,我们这里使用原生的 socket.io-client 依赖实现,以便大家更好地理解底层实现原理。

使用 NPM 安装这个库:

npm install --save socket.io-client

然后在 resources/js/bootstrap.js 中新增如下代码:

const io = require('socket.io-client');

const socket = io(window.location.hostname + ':3000');

socket.on('laravel_database_test-channel:UserSignedUp', data => {
    console.log(data.username);
});

这里我们建立了与 Websocket 服务端的连接,再通过 socket.on 指定监听的频道和事件,最后通过闭包打印事件负荷数据,这是一个非常简单的 Websocket CS 实现,更多 Socket.io 的使用细节,请参考其官方文档,毕竟这不是我们这里关注的重点。

测试事件消息广播功能

到这里,我们就完成了广播系统的服务端和客户端简单实现,接下来我们来验证下服务端发布消息后,是否可以广播到客户端。

运行 npm run dev 重新编译前端代码,在 resources/views 目录下新建一个 websocket.blade.php 用于测试的视图模板文件:

<!DOCTYPE html>
<html lang="{{ str_replace('_', '-', app()->getLocale()) }}">
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">

    <title>Laravel Websocket</title>

    <!-- Fonts -->
    <link href="https://fonts.googleapis.com/css2?family=Nunito:wght@400;600;700&display=swap" rel="stylesheet">

    <link href="{{ asset('css/app.css') }}" rel="stylesheet">
</head>
<body class="antialiased">
    <h1>Broadcast Test</h1>
</body>
<script src="{{ asset('js/app.js') }}" type="text/javascript"></script>
</html>

由于引入了 app.js,所以会执行定义在 bootstrap.js 中的广播事件消息接收和处理代码。在 routes/web.php 中注册一个对应的路由:

Route::get('/broadcast', function () {
    return view('websocket');
});

在浏览器中访问 http://redis.test/broadcast

-w1264

没有成功建立 Websocket 连接,而是报 CORS 错误,为了解决这个问题,需要到 websocket.js 中设置 Websocket 服务器的 CORS 策略,允许来自 redis.test 域名的 GET 请求:

var io = require('socket.io')(server, {
    cors: {
        origin: "http://redis.test",
        methods: ["GET", "POST"]
    }
});

重启 Websocket 服务器,这个时候就可以看到 Websocket 连接建立成功了:

-w1054

Websocket 连接如何建立的细节可以参考网络协议部分从 Ajax 到 Websocket 这篇教程,这里就不再赘述了。

接下来,我们再开启一个访问 http://redis.test/broadcast 的浏览器窗口,以便测试服务端消息是否同时广播到多个客户端了。然后运行如下 Artisan 命令基于 Redis 发布消息:

sail artisan redis:publish

在 Websocket 服务端日志输出中,可以看到 Redis 订阅客户端已经接收到服务端发布的消息:

-w999

再看两个浏览器窗口,在 Websocket 消息流中,可以看到 Websocket 服务端广播事件消息到客户端的记录:

-w1398

再看浏览器 Console 标签页,两个浏览器窗口都打印出了「学院君」,说明客户端已经成功接收到服务端广播的消息:

-w1236

小结

至此,我们就基于 Redis 的发布/订阅功能,结合 Socket.io 实现了简单的事件广播功能。

这篇教程偏底层基本原理,下篇教程,学院君将结合事件广播 + Redis 消息队列 + Laravel Echo Server + Laravel Echo 更系统更全面地介绍 Laravel 广播组件的所有高阶功能使用。


Vote Vote Cancel Collect Collect Cancel

<< 上一篇: 基于 Redis 消息队列实现 Laravel 事件监听及底层源码探究

>> 下一篇: 基于 Redis 实现 Laravel 广播功能(上):广播事件分发和底层源码探究