基于 Redis 消息队列实现 Laravel 事件监听及底层源码探究
在 Laravel 中,除了使用 dispatch
辅助函数通过 Illuminate\Bus\Dispatcher
显式推送队列任务外,还可以通过事件监听的方式隐式进行队列任务推送,在这个场景下,事件监听器实际上扮演了「任务类」的角色。
还是以文章浏览数更新为例。开始之前,我们先来给大家演示下事件监听和处理的基本实现。
事件监听基本使用
首先创建一个文章浏览事件类 PostViewed
:
sail artisan make:event PostViewed
然后编写这个事件类代码如下:
<?php
namespace App\Events;
use App\Models\Post;
use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class PostViewed
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public Post $post;
/**
* Create a new event instance.
*
* @param Post $post
*/
public function __construct(Post $post)
{
$this->post = $post;
}
/**
* Get the channels the event should broadcast on.
*
* @return Channel|array
*/
public function broadcastOn()
{
return new PrivateChannel('channel-name');
}
}
事件类的作用就是装载事件相关的数据,这里我们引入了 Post
模型实例,以便在事件监听器中进行相应的处理,事件类中默认还有一个 broadcastOn
表示事件的广播通道,我们在后面介绍广播时再详细介绍这个方法。
有了事件之后,还要创建一个监听这个事件的处理器:
php artisan make:listener IncreasePostViews
编写处理器代码如下:
<?php
namespace App\Listeners;
use App\Events\PostViewed;
use Illuminate\Support\Facades\Redis;
class IncreasePostViews
{
/**
* Create the event listener.
*
* @return void
*/
public function __construct()
{
//
}
/**
* Handle the event.
*
* @param PostViewed $event
* @return void
*/
public function handle(PostViewed $event)
{
if ($event->post->increment('views')) {
Redis::zincrby('popular_posts', 1, $event->post->id);
}
}
}
我们将之前队列任务类的 handle
方法代码搬到了事件监听器的 handle
方法中,作为文章浏览事件发生时的处理逻辑。
要建立事件与监听器之间的映射关系,保证事件发生时可以通过监听器对其进行处理,需要在 EventServiceProvider
中维护一个监听数组配置:
protected $listen = [
...
PostViewed::class => [
IncreasePostViews::class
],
];
以事件做键,事件监听器做值,表示一个事件可以同时被多个事件监听器监听和处理。
Laravel 还提供了事件自动发现功能,不过考虑到反射性能较差,我们这里还是使用传统的手动注册方式。
这样一来,当我们在 PostController
的 show
方法中触发 PostViewed
事件时:
// 浏览文章
public function show($id)
{
$post = $this->postRepo->getById($id);
// 触发文章浏览事件
event(new PostViewed($post));
return "Show Post #{$post->id}, Views: {$post->views}";
}
就会触发监听该事件的所有处理器类执行 handle
方法处理这个事件,默认情况下,事件监听器是同步执行的,所以你可以立即看到文章浏览数被更新:
基于队列处理事件监听
这只是一个更新单条数据库记录的事件处理,如果是耗时操作,比如网络请求、邮件发送、大的数据库事务等,同步处理事件监听会导致这个页面浏览要加载很长时间,降低用户体验和系统负载,所以 Laravel 还支持将事件处理推送到消息队列异步处理,提升系统性能,优化用户体验。
要让事件处理自动推送到消息队列,只需要让对应的事件监听器类和队列任务类一样实现 ShouldQueue
接口即可,为了方便与队列系统交互,你还可以使用 InteractsWithQueue
Trait(这一步不是必须的):
...
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
class IncreasePostViews implements ShouldQueue
{
use InteractsWithQueue;
...
}
如果你在创建之初就已经明确知道这个事件监听器的处理操作会推送到队列,可以在创建事件监听器的时候使用
--queued
选项:php artisan make:listener IncreasePostViews
。
其他代码不用做任何调整,这样,当事件触发时,对于这个实现了 ShouldQueue
接口的监听器,Laravel 会自动将其作为「任务类」推送到消息队列(默认连接、默认队列名称),如果你想要自定义队列连接、队列名称、延迟和重试之类的配置,可以像在队列任务类中一样在这个监听器类中定义 connection
、queue
、delay
、tries
等属性:
public string $queue = 'events';
这个时候,当你刷新浏览器中的文章浏览页面时,就会发现不再执行文章浏览数更新操作了,说明这个处理操作被推送到队列系统了:
你可以在 Redis 队列 laravel_database_queues:events
中看到对应的消息数据:
这个消息数据对应的 JSON 数据如下:
其中的 data.command
反序列化后的结果如下:
其实就是 IncreasePostViews
监听器类,可以看到这些数据结构和消息队列一模一样,所以可以大胆猜测它们底层共用了同一套代码。
为了让 events
队列中的事件监听器被处理掉,运行如下命令启动消息队列处理进程:
你可以到数据库中验证 posts.id = 88
的记录,如果 views
字段值等于 97,则表明文章浏览事件被成功处理。
底层实现源码
为了一探事件监听和处理的底层实现原理,我们到 Laravel 底层查看相关的源码实现。
注册事件与对应的监听器处理逻辑
在 Laravel 应用启动过程中,会调用 App\Providers\EventServiceProvider
的 register
方法基于 listen
数组注册事件和监听器的映射关系:
这里的 Event
门面是在 Illuminate\Events\EventServiceProvider
的 register
方法中注册的 events
服务的代理:
Event::listen
调用的就是 Dispatcher
类的 listen
方法,需要注意的是这里的 Dispatcher
对应着 Illuminate\Events\Dispatcher
类,而不是队列任务分发时调用的 Illuminate\Bus\Dispatcher
类。这两个类不是同一个类,也分别实现了不同接口。
在初始化 Illuminate\Events\Dispatcher
实例时还通过 setQueueResolver
方法基于闭包函数设置了队列服务实例,如果事件处理要推送到队列,则使用这个服务实例进行操作,该闭包函数返回的服务实例正是 QueueManager
对象实例。
在 Illuminate\Events\Dispatcher
的 listen
方法中,我们得以窥见事件及对应监听器处理逻辑的注册源码:
public function listen($events, $listener = null)
{
if ($events instanceof Closure) {
return $this->listen($this->firstClosureParameterType($events), $events);
} elseif ($events instanceof QueuedClosure) {
return $this->listen($this->firstClosureParameterType($events->closure), $events->resolve());
} elseif ($listener instanceof QueuedClosure) {
$listener = $listener->resolve();
}
foreach ((array) $events as $event) {
if (Str::contains($event, '*')) {
$this->setupWildcardListen($event, $listener);
} else {
$this->listeners[$event][] = $this->makeListener($listener);
}
}
}
不论是基于闭包的,还是基于通配符的,还是基于 PHP 类的(这些示例都可以在 Laravel 事件文档中看到),在这里通通一览无余,以我们定义的 $listen
数组为例,最终所有事件类和对应监听器处理逻辑映射关系都被维护到 Illuminate\Events\Dispatcher
的 listeners
数组中,Dispatcher
是以单例模式绑定到服务容器的,所以 listeners
数组在启动期间一经注册完毕,在当前请求生命周期全局有效。
所有事件对应的监听器处理逻辑此时都是闭包函数,只有在对应事件被触发时才会真正执行,我们在执行时再详细剖析 makeListener
方法的底层实现。
事件触发时底层处理逻辑
event
辅助函数对应的实现代码如下:
function event(...$args)
{
return app('events')->dispatch(...$args);
}
这里的 app('events')
会被解析为上面的 Illuminate\Events\Dispatcher
对象实例,所以当我们通过 event
函数触发事件时,实际上调用的是 Illuminate\Events\Dispatcher
类的 dispatch
方法:
public function dispatch($event, $payload = [], $halt = false)
{
[$event, $payload] = $this->parseEventAndPayload(
$event, $payload
);
if ($this->shouldBroadcast($payload)) {
$this->broadcastEvent($payload[0]);
}
$responses = [];
foreach ($this->getListeners($event) as $listener) {
$response = $listener($event, $payload);
if ($halt && ! is_null($response)) {
return $response;
}
if ($response === false) {
break;
}
$responses[] = $response;
}
return $halt ? null : $responses;
}
在这个方法中,我们首先从参数中解析出事件名和负荷数据。
负荷数据在广播时会用到,我们后面介绍广播时再详细探讨它,这里先忽略。
如果这是个广播事件,则进行广播事件推送处理,然后继续往后执行,从 listeners
数组中通过事件名解析出所有与之映射的监听器处理逻辑,由于映射的监听器处理逻辑此时都是闭包函数,所以需要调用对应的闭包函数才能真正执行这些处理逻辑:
$response = $listener($event, $payload);
不同类型监听器底层处理逻辑
我们接下来来分析 makeListener
方法底层是如何通过闭包函数封装监听器的事件处理逻辑的:
public function makeListener($listener, $wildcard = false)
{
if (is_string($listener)) {
return $this->createClassListener($listener, $wildcard);
}
if (is_array($listener) && isset($listener[0]) && is_string($listener[0])) {
return $this->createClassListener($listener, $wildcard);
}
return function ($event, $payload) use ($listener, $wildcard) {
if ($wildcard) {
return $listener($event, $payload);
}
return $listener(...array_values($payload));
};
}
对于字符串类型的监听器类,它会调用 Dispatcher
类的 createClassListener
方法创建监听器类实例:
public function createClassListener($listener, $wildcard = false)
{
return function ($event, $payload) use ($listener, $wildcard) {
if ($wildcard) {
return call_user_func($this->createClassCallable($listener), $event, $payload);
}
$callable = $this->createClassCallable($listener);
return $callable(...array_values($payload));
};
}
该方法又会调用 createClassCallable
方法对监听器类做进一步处理:
protected function createClassCallable($listener)
{
[$class, $method] = is_array($listener)
? $listener
: $this->parseClassCallable($listener);
if (! method_exists($class, $method)) {
$method = '__invoke';
}
if ($this->handlerShouldBeQueued($class)) {
return $this->createQueuedHandlerCallable($class, $method);
}
return [$this->container->make($class), $method];
}
在这个方法中,首先会解析监听器处理事件的方法,默认是 handle
方法,如果该方法不存在,则使用 __invoke
方法(所以在 IncreasePostViews
中,还可以定义 __invoke
方法替代 handle
),如果监听器类实现了 ShouldQueue
接口,则调用 createQueuedHandlerCallable
方法定义将其推送到队列的闭包函数:
protected function createQueuedHandlerCallable($class, $method)
{
return function () use ($class, $method) {
$arguments = array_map(function ($a) {
return is_object($a) ? clone $a : $a;
}, func_get_args());
if ($this->handlerWantsToBeQueued($class, $arguments)) {
$this->queueHandler($class, $method, $arguments);
}
};
}
这里还有一个判断逻辑,handlerWantsToBeQueued
会基于监听器类定义的 shouldQueue
方法判断当前事件监听器是否满足推送到队列执行的条件(所以可以在事件监听器类中基于这个方法实现按条件推送到队列),如果不满足也不会推送到队列,如果满足则调用 queueHandler
方法将当前事件监听器作为任务类推送到队列:
这里的队列服务实例正是从 EventServiceProvider
注册 events
服务时通过 setQueueResolver
设置的队列服务中解析出来的,最终对应的是 QueueManager
对象实例,这里可以基于事件监听器定义的 connection
、queue
、delay
属性解析队列连接、名称和延迟推送时间,如果监听器类没有定义,则使用默认值,后面的实现源码想必我也不用贴出来了,参考前面消息队列底层源码即可(当前是基于 Redis 驱动的队列系统,对应的队列实现类是 RedisQueue
)。
回到 createClassCallable
方法,如果当前监听器类没有实现 ShouldQueue
接口,则直接以数组形式返回当前监听器类对象实例和处理事件的方法,以 IncreasePostViews
为例,是 handle
方法。而在上一层 createClassListener
方法中,不管推送到队列还是直接执行,所有事件监听器处理逻辑最终都会通过闭包函数封装返回给上一层调用代码。
回到最上层 makeListener
方法,如果是通配符事件或者基于闭包含函数定义的监听器则在前面处理的基础上再包裹一层闭包函数返回。
至此,我们就取得了所有类型事件监听器的处理逻辑闭包函数:
- 对于字符串类型的监听器类,如果实现了
ShouldQueue
接口,则返回推送到队列的闭包函数,否则返回直接执行监听器实例处理方法的闭包函数; - 对于通配符事件监听器和基于闭包的事件监听器,则在之前处理基础上在外层再包裹一层闭包函数返回。
这样,当我们在 Illuminate\Events\Dispatcher
类的 dispatch
方法中调用如下这行代码时:
$response = $listener($event, $payload);
$listener
是一个支持 $event
和 $payload
参数的闭包函数,如果是实现了 ShouldQueue
接口的监听器,则执行该闭包函数会推送监听器类到消息队列,等待被处理进程消费;如果是没有实现 ShouldQueue
接口的监听器,则直接执行处理器方法;其他情况则执行相应的闭包函数。
小结
好了,到这里,你应该对 Laravel 事件监听和处理的全貌了然于胸了吧,事件的监听处理和队列推送消费很像,都是把生产者和消费者隔离,从而降低业务代码的耦合,提高系统的水平扩展性,而且事件处理部分也可以推送到队列处理,进而提升系统性能,这个时候,事件监听和处理就演化成了基于事件订阅的消息队列系统了。
2 Comments
文章的第一个命令sail artisan make:event PostViewed 里面的sail是不是应该换成php啊
我使用的是 sail 开发环境:https://laravelacademy.org/post/22173