Laravel 消息队列的优先级和失败任务重试实现
上篇教程发布后,有同学反馈消息队列的优先级怎么实现,Laravel 本身对此提供了支持,除此之外,Laravel 的队列组件还支持批处理、延迟推送、失败任务处理、消息队列中间件、频率限制等很多特性,一篇教程根本介绍不完,毕竟消息队列也是个很复杂的系统,但是放到这里来讲似乎又偏离了 Redis 这个主题,所以这里学院君先给大家简单介绍下消息队列优先级和失败任务处理的实现,至于更多功能特性,后面单独开一个消息队列专题进行系统介绍。
队列优先级
我们可以推送任何任务作为消息数据到队列系统,但是不同任务的优先级是不同的,比如一个订单支付任务的优先级肯定是要高于文章浏览数更新这种一般任务,那么如何让队列按照优先级处理不同任务呢?
推送任务到不同的队列
Laravel 队列组件本身支持推送任务到多个队列,然后在处理队列任务时通过指定读取队列的顺序实现队列优先级的效果,并不是像数据结构底层那样基于堆排序实现队列优先级,这一点需要知悉。
我们可以在分发任务时通过 onQueue
方法显式指定推送的队列名称(不调用该方法默认是 default
):
dispatch(new PostViewsIncrement($post))->onQueue('default');
如果还有另一个用于订阅支付的 SubscriptionPayment
任务,可以指定将其推送到 payment
队列:
dispatch(new SubscriptionPayment($order))->onQueue('payment');
此外,你还可以在任务类中通过 queue
属性指定该任务被推送到的队列:
<?php
...
class SubscriptionPayment implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public string $queue = 'payment';
...
}
这样一来,在分发任务时就不需要显式调用 onQueue
方法了:
dispatch(new SubscriptionPayment($order));
指定队列任务的处理顺序
接下来,在启动队列任务处理进程(Worker)时,可以通过 --queue
选项指定该处理进程消费的队列和先后顺序:
php artisan queue:work --queue=payment,default
不指定默认处理的是
default
队列中的任务,你当然可以通过--queue=payment
指定进程只处理payment
队列中的任务,然后再启动一个处理进程单独处理default
队列中的任务,但是这样的弊端是如果payment
队列是空的话,就使得整个进程处于空闲状态,白白浪费了系统资源。
在底层,Laravel 会根据处理进程启动时指定的队列顺序依次读取每个队列中的任务进行处理,对应的源码位于 Worker
类的 getNextJob
方法中:
protected function getNextJob($connection, $queue)
{
$popJobCallback = function ($queue) use ($connection) {
return $connection->pop($queue);
};
try {
if (isset(static::$popCallbacks[$this->name])) {
return (static::$popCallbacks[$this->name])($popJobCallback, $queue);
}
// 依次读取队列列表中每个队列中的任务进行处理
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $popJobCallback($queue))) {
return $job;
}
}
} catch (Throwable $e) {
...
}
}
这样一来,我们就可以基于此特性实现队列优先级功能 —— 先处理 payment
队列中的任务,再处理 default
队列中的任务,最终呈现的效果就是 payment
队列优先级高于 default
。
实现消息队列的负载均衡
但是这也会引出另一个问题 —— 如果 payment
队列负载较高,一直处理繁忙状态,那么 default
队列中的任务就会一直阻塞,没有机会执行,为了解决这个问题,一种方案是多开几个同样的处理进程,提高 payment
队列的处理进度:
php artisan queue:work --queue=payment,default
php artisan queue:work --queue=payment,default
php artisan queue:work --queue=payment,default
php artisan queue:work --queue=payment,default
php artisan queue:work --queue=payment,default
但是在业务高峰期,可能这也还是解决不了问题,而且具体要开几个处理进程也是无法准确预判的,要彻底解决这个问题,可以另开几个优先处理 default
队列的进程:
php artisan queue:work --queue=default,payment
php artisan queue:work --queue=default,payment
php artisan queue:work --queue=default,payment
php artisan queue:work --queue=default,payment
php artisan queue:work --queue=default,payment
这样一来,就可以实现消息队列的负载均衡了,前 5 个进程优先处理 payment
队列任务,后 5 个进程优先处理 default
队列任务,如果 payment
为空,则可以全部用于处理 default
队列中的任务。当然了,这里只是一个简单的示例,具体比例如何设置,取决于你自己的业务负载。
在生产环境,可以通过 Supervisor 配置管理这些队列任务处理进程。
失败任务重试
基于 Webhook 推送消息到其他应用
以上演示的都是同一个应用内部的消息数据推送,此外,我们还可以借助 Webhook 实现不同应用之间的消息推送。
在应用中集成过第三方服务的同学应该对 Webhook 很熟悉,在业务流程的某个节点,通过第三方服务接口对应用状态做更新后,由于这种网络请求是异步操作,响应时间是未知的,需要通过 Webhook 获取第三方服务的更新结果。
作为第一方应用,我们也可以对外提供这种 Webhook URL,告知第三方以应用服务接口的响应结果,我们把响应数据看作消息的话,这个时候,我们的第一方应用是消息数据的生产者,调用我们服务等待响应结果的第三方应用是处理消息数据的消费者(Worker),在生产者一方,对于这种将响应结果通过 HTTP 请求发送给调用方的操作,我们可以基于消息队列来管理,因为 HTTP 请求是耗时的网络 IO,执行时间不确定,另外既然是网络请求,网络的稳定性无法保证,如果断网导致请求失败,需要进行重试。
创建任务类并分发
为此我们可以创建一个 SendWebhook
类作为消息队列的任务类:
php artisan make:job SendWebhook
然后在 SendWebhook
中编写响应数据的处理结果:
<?php
namespace App\Jobs;
use App\Services\Service;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Http;
class SendWebhook implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $queue = 'service';
public Service $service;
public array $data;
/**
* Create a new job instance.
*
* @param Service $service
* @param array $data
*/
public function __construct(Service $service, array $data)
{
$this->service = $service;
$this->data = $data;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
// 基于 HTTP 请求发送响应给调用方
Http::timeout(5)->post($this->service->url, $this->data);
}
}
在这个任务类中,我们通过 queue
属性指定了要推送到的队列名称是 service
,在 handle
方法中,使用了 HTTP 客户端 API 发送响应给调用方,并设置了请求超时时间是 5s。
这样一来,我们就可以在完成第三方请求响应处理后,通过分发这个任务进行异步的响应处理:
dispatch(new SendWebhook($service, $data));
失败任务重试
前面我们说了,这里存在网络请求,网络稳定性无法保证,很有可能出现断网导致请求失败的情况,这个时候,我们就需要对执行失败的任务进行重试,这可以通过在启动处理进程时指定 --tries
选项实现:
php artisan queue:work --queue=service,default --tries=3
这里指定了该进程处理的所有队列任务总的执行次数是 3(第一次运行失败后,还会重试两次),如果你觉得不需要这么笼统的设置,可以在任务类中自定义任务失败后的重试机制:
public function handle()
{
// 基于 HTTP 请求发送响应给调用方
$response = Http::timeout(5)->post($this->service->url, $this->data);
// 如果响应失败,则将此任务再次推送到队列进行重试
if ($response->failed()) {
// 延迟 10s 后推送,默认是 0,表示立即推送
$this->release(10);
}
}
你可以可以根据已执行次数依次递增延迟时间:
if ($response->failed()) {
// 第一次重试延迟 10s,第二次延迟 20s,依次类推...
$this->release(10 * $this->attempts());
}
如果没有在处理器命令启动时指定 tries
选项,那么这个任务会无休无止地重试下去,直到执行成功,你可以通过在任务类中定义一个 $tries
属性指定最大尝试次数:
public int $tries = 3;
还可以新增一个 retryUntil
方法定义任务过期时间(到达过期时间后不再重试,定义 retryUntil
属性亦可):
// 重试过期时间
public function retryUtil()
{
// 1 天后
return now()->addDay();
}
这样一来,会重试这个任务一天,一天后不再重试。如果同时设置了 tries
属性和 retryUntil
方法,则以先到达的终止条件为准。
最后,如果所有尝试次数用尽还未执行成功,则将该任务标记为执行失败,我们可以在任务类中定义一个 failed
方法编写任务执行失败后的业务逻辑:
// 任务执行失败后发送邮件通知给相关人员
public function failed(Throwable $exception)
{
Mail::to($this->service->developer->email)->send(...);
}
执行失败的任务会存储到 failed_jobs
数据表中。对于执行失败的任务,可以通过 Artisan 命令 queue:retry
进行再次重试。具体细节参考官方文档即可,这里不再演示了。
No Comments