基于 Redis 实现 Laravel 广播功能(上):广播事件分发和底层源码探究


上篇教程中,学院君给大家演示了如何通过 Redis + Socket.io 实现事件消息广播功能,这是一个非常简单的实现,目的在于帮助大家熟悉实时消息广播的底层流程,今天这篇教程,我们将结合 Laravel 生态提供的广播组件和前端技术栈来搭建一个生产环境可用的、更加系统的实时消息系统。

这里使用的技术栈是基于 Redis 驱动的 Laravel 广播组件 + 封装了 Socket.io 服务端的 Laravel Echo Server + 封装了 Socket.io 客户端的 Laravel Echo,底层的基本流程其实还是和上篇教程所演示的一样,只是在其基础上封装了更复杂的业务功能,下面我们先来搭建这个广播系统并分析其底层实现源码,再演示上层支持的各种业务功能。

Laravel 后端配置

要使用 Laravel 提供的广播组件,需要在 config/app.php 中取消 BroadcastServiceProvider 前面的注释:

'providers' => [
    ...
    
    App\Providers\BroadcastServiceProvider::class,
    
    ...
],

以便可以在应用启动时加载广播相关路由:

public function boot()
{
    Broadcast::routes();

    require base_path('routes/channels.php');
}

channels.php 中的路由和 web.php 中的路由不同,前者是基于 Websocket 协议进行通信的,后者是基于 HTTP 协议进行通信的。

和缓存、队列一样,广播也支持多种驱动,比如 Pusher、Redis,我们可以在 .env 通过设置 BROADCAST_DRIVER 来配置广播驱动,这里将其配置为 Redis:

BROADCAST_DRIVER=redis

至此,服务端配置工作就完成了。

定义广播事件类

Laravel 支持通过分发广播事件的方式来发布消息(上篇教程我们通过数组模拟了事件消息),要创建广播事件,使用如下 Artisan 命令即可:

php artisan make:event UserSignedUp

如果要让 Laravel 分发事件时以广播形式推送,需要让其实现 ShouldBroadcast 接口,我们编写 UserSignedUp 这个广播事件类实现如下:

<?php

namespace App\Events;

use App\Models\User;
use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class UserSignedUp implements ShouldBroadcast
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public User $user;
    public string $broadcastQueue = 'broadcast';

    /**
     * Create a new event instance.
     *
     * @param User $user
     */
    public function __construct(User $user)
    {
        $this->user = $user;
    }

    /**
     * Get the channels the event should broadcast on.
     *
     * @return Channel|array
     */
    public function broadcastOn()
    {
        return new Channel('test-channel');
    }
}

我们将上篇教程中以数组形式模拟的事件消息数据转化为了广播事件类,事件负荷数据通过属性形式设置,并且在 broadcastOn 方法中定义了事件消息将被推送到的频道,以及通过 broadcastQueue 属性指定了事件消息如果被推送到队列的话对应的队列名称。

广播事件类和普通的事件类基本结构是一样的,只是在其基础上实现了 ShouldBroadcast 接口表示这是个广播事件,然后通过 broadcastOn 方法定义了广播频道,你可以基于 InteractsWithSockets 提供的方法进行一些 Websocket 设置,还可以定义一些其他的方法和属性用于设置该事件的广播和推送到消息队列的行为,这些方法和属性稍后会在事件分发底层实现中看到。

广播事件分发及底层实现

和普通事件类一样,广播事件也要通过分发进行处理。我们可以在应用的任何地方分发广播事件,为了简化演示,我们将上篇教程编写的 RedisPublish 命令执行代码改为分发广播事件:

public function handle()
{
    $user = \App\Models\User::find(1);
    event(new UserSignedUp($user));
}

和普通事件类不同的是,广播事件无需注册对应的事件监听器定义处理逻辑,如果实现了 ShouldBroadcast 接口分发广播事件会将其推送到 Laravel 当前使用的消息队列系统进行异步处理,如果实现了 ShouldBroadcastNow 接口则立即广播这个事件,如果没有实现这些接口就不是广播事件,按照普通事件类处理。

我们来看看广播事件分发的底层实现,和普通事件一样,最终也是通过 Illuminate\Events\Dispatcherdispatch 分发处理的,我们注意到其中包含这段广播事件处理代码:

if ($this->shouldBroadcast($payload)) {
    $this->broadcastEvent($payload[0]);
}

$payload 是通过数组包裹的传入 dispatch 方法的事件实例数据,因此 $payload[0] 也就是事件实例本身了,这里的 shouldBroadcast 方法用于判断当前事件是否需要广播,判断依据如下:

这个事件实例是否实现了 ShouldBroadcast 接口,以及如果事件类中定义了 broadcastWhen 方法,条件是否为 true(没有定义的话默认返回为 true),这两个条件同时满足才会广播,对应的实现源码位于 shouldBroadcast 方法中,非常简单,源码就不贴出来了。

如果需要广播,则调用 broadcastEvent 方法广播这个事件:

protected function broadcastEvent($event)
{
    $this->container->make(BroadcastFactory::class)->queue($event);
}

里面的这行代码最终会调用 Illuminate\Broadcasting\BroadcastManagerqueue 方法(相关服务容器绑定和别名设置位于 Illuminate\Broadcasting\BroadcastServiceProviderregister 方法中)进行广播事件的处理:

public function queue($event)
{
    if ($event instanceof ShouldBroadcastNow) {
        return $this->app->make(BusDispatcherContract::class)->dispatchNow(new BroadcastEvent(clone $event));
    }

    $queue = null;

    if (method_exists($event, 'broadcastQueue')) {
        $queue = $event->broadcastQueue();
    } elseif (isset($event->broadcastQueue)) {
        $queue = $event->broadcastQueue;
    } elseif (isset($event->queue)) {
        $queue = $event->queue;
    }

    $this->app->make('queue')->connection($event->connection ?? null)->pushOn(
        $queue, new BroadcastEvent(clone $event)
    );
}

立即广播事件消息

如果广播事件实现了 ShouldBroadcastNow 接口,则通过 Illuminate\Bus\DispatcherdispatchNow 方法立即进行处理,最终执行的是 BroadcastEvent 实例的 handle 方法将其进行广播:

public function handle(Broadcaster $broadcaster)
{
    $name = method_exists($this->event, 'broadcastAs')
            ? $this->event->broadcastAs() : get_class($this->event);

    $broadcaster->broadcast(
        Arr::wrap($this->event->broadcastOn()), $name,
        $this->getPayloadFromEvent($this->event)
    );
}

事件名默认是类名,如果事件类定义了 broadcastAs 方法,则以其返回值作为事件名。接下来,就是调用 $broadcaster->broadcast 方法广播这个事件了,$broadcaster 是通过依赖注入传递进来的广播驱动实现实例,其服务绑定代码位于 Illuminate\Broadcasting\BroadcastServiceProviderregister 方法中:

$this->app->singleton(BroadcasterContract::class, function ($app) {
    return $app->make(BroadcastManager::class)->connection();
});

也就是调用 BroadcastManager 实例的 connection 方法返回的广播驱动连接实例,我们这里的 BROADCAST_DRIVER 配置值为 redis,所以最终调用 createRedisDriver 返回 RedisBroadcaster 实例作为广播驱动实例。所以上面的 $broadcaster->broadcast 最终执行的是 RedisBroadcasterbroadcast 方法:

-w756

第一个参数是频道,以 UserSignedUp 事件为例,就是通过 broadcastOn 方法返回的 test-channel,频道参数不能为空,否则会退出,第二个参数是事件名,第三个参数是事件负荷数据,也就是基于 BroadcastEventgetPayloadFromEvent 处理后的事件消息数据,其中包含事件本身的属性数据。

broadcast 方法中,会将事件名和事件负荷数据一起封装到最终的 $payload 中,然后通过 Redis 连接,通过 PUBLISH 指令发布这个事件消息(在 broadcastMultipleChannelsScript 方法中包含了相应的 LUA 脚本)。

你看,经历这么多重重「关卡」,最终落地的还不是一个 Redis PUBLISH 指令。如果在 Websocket 服务器中通过 Redis 订阅了 test-channel 这个频道,就可以接收到这个消息,然后将其广播给所有建立连接的 Websocket 客户端了。

将事件消息推送到队列

不过细心的同学可能已经注意到 Illuminate\Events\DispatchershouldBroadcast 方法并没有针对是否实现 ShouldBroadcastNow 接口做判断,因此目前 Laravel 只是底层支持了立即广播事件消息,上层业务是不支持的,所以回到 Illuminate\Broadcasting\BroadcastManagerqueue 方法,我们继续往下看:

$queue = null;

if (method_exists($event, 'broadcastQueue')) {
    $queue = $event->broadcastQueue();
} elseif (isset($event->broadcastQueue)) {
    $queue = $event->broadcastQueue;
} elseif (isset($event->queue)) {
    $queue = $event->queue;
}

$this->app->make('queue')->connection($event->connection ?? null)->pushOn(
    $queue, new BroadcastEvent(clone $event)
);

接下来,就是将事件消息推送到队列系统的操作了,首先获取队列名称,如果事件类定义了 broadcastQueue 方法,则将其返回值作为队列名称,否则使用事件实例上的 broadcastQueue 或者 queue 属性值作为队列名称,如果以上都没有设置,则只能使用默认的 default 作为队列名称了,这里我们设置了 broadcastQueue 属性,所以会被推送到 broadcast 这个队列。

最后,就是调用队列连接(根据当前配置,默认使用的是 Redis 连接,你也可以通过在事件类中设置 connection 属性指定其他队列连接)的 pushOn 方法推送封装了当前事件的 BroadcastEvent 实例到队列系统了,最终执行的就是位于 RedisQueue 中的 push 方法,我们前面介绍队列系统时已经详细介绍过这块的底层实现,这里就不再重复了。

基于前面事件监听和处理的底层实现分析,我们也可以预判,当启动队列处理器处理 broadcast 队列时,会按照上面立即广播事件消息的方式,基于 Illuminate\Bus\DispatcherdispatchNow 方法处理 BroadcastEvent,即执行其 handle 方法通过 RedisBroadcasterbroadcast 方法使用 Redis PUBLISH 指令发布消息。

所以虽然广播事件没有定义显式的事件监听器,但是底层其实是通过 BroadcastEvent 作为统一的广播事件监听器来处理所有广播事件的。所以啊,广播事件的处理是 Laravel 框架事件监听和消息队列的集大成者,了解它的底层实现,也就等于搞懂了所有这几个组件的实现原理。


Vote Vote Cancel Collect Collect Cancel

<< 上一篇: 基于 Redis 发布订阅 + Socket.io 实现事件消息广播功能

>> 下一篇: 基于 Redis 实现 Laravel 广播功能(中):引入 Laravel Echo 接收广播消息