队列
1、介绍
Laravel 队列为不同的后台队列服务提供统一的API,例如Beanstalk,Amazon SQS, Redis,甚至其他基于关系型数据库的队列。队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短Web请求和相应的时间。
队列配置文件存放在 config/queue.php
。每一种队列驱动的配置都可以在该文件中找到,包括数据库、Beanstalkd、Amazon SQS、Redis以及同步(本地使用)驱动。其中还包含了一个 null
队列驱动用于那些放弃队列的任务。
连接 Vs. 队列
在开始使用 Laravel 队列以前,了解“连接”和“队列”的关系非常重要。在配置文件 config/queue.php
有一个 connections
配置项。该配置项定义了后台队列服务的特定连接,如 Amazon SQS, Beanstalk, 或 Redis。每种队列连接都可以有很多队列,可以想象在银行办理现金业务的各个窗口队列。
请注意 queue
配置文件中的每个连接配置示例都有一个 queue
属性。当新的队列任务被添加到指定的连接时,该配置项的值就是默认监听的队列(名称)。换种说法,如果你没有指派特别的队列名称,那么 queue
的值,也是该任务默认添加到的队列(名称):
// 以下的任务将被委派到默认队列...
dispatch(new Job);
// 以下任务将被委派到 "emails" 队列...
dispatch((new Job)->onQueue('emails'));
有些应用并不需要将任务分配到多个队列,单个队列已经非常适用。但是,应用的任务有优先级差异或者类别差异的时候,推送任务到多个队列将是更好地选择,因为 Laravel 的队列进程支持通过优先级指定处理的队列。举个例子,你可以将高优先级的任务委派到high
(高优先级)队列,从而让它优先执行。
php artisan queue:work --queue=high,default
驱动预备知识
数据库
要使用 database
队列驱动,你需要数据表保存任务信息。要生成创建这些表的迁移,可以运行 Artisan 命令 queue:table
,迁移被创建之后,可以使用 migrate
命令生成这些表:
php artisan queue:table
php artisan migrate
Redis
要使用 redis
队列驱动,需要在配置文件 config/database.php
中配置 Redis 数据库连接。
如果 Redis 队列连接使用 Redis Cluster(集群),队列名称必须包含 key hash tag,以确保给定队列对应的所有 Redis keys 都存放到同一个 hash slot:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
其他驱动预备知识
如果使用以下几种队列驱动,需要安装相应的依赖:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~3.0
- Redis:
predis/predis ~1.0
2、创建任务
生成任务类
通常,所有的任务类都保存在 app/Jobs
目录.如果 app/Jobs
不存在,在运行 Artisan 命令 make:job
的时候,它将会自动创建。你可以通过 Artisan CLI 来生成队列任务类:
php artisan make:job SendReminderEmail
生成的类都实现了 Illuminate\Contracts\Queue\ShouldQueue
接口, 告诉 Laravel 将该任务推送到队列,而不是立即运行。
任务类结构
任务类非常简单,通常只包含处理该任务的 handle
方法,让我们看一个任务类的例子。在这个例子中,我们模拟管理播客发布服务,并在发布以前上传相应的播客文件:
<?php
namespace App\Jobs;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建任务实例
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 执行任务
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 处理上传的播客…
}
}
在本示例中,我们将 Eloquent 模型作为参数直接传递到构造函数。因为该任务使用了 SerializesModels
trait,Eloquent 模型将会在任务被执行时优雅地序列化和反序列化。如果你的队列任务在构造函数中接收Eloquent 模型,只有模型的主键会被序列化到队列,当任务真正被执行的时候,队列系统会自动从数据库中获取整个模型实例。这对应用而言是完全透明的,从而避免序列化整个 Eloquent 模型实例引起的问题。
handle
方法在任务被处理的时候调用,注意我们可以在任务的 handle
方法中进行依赖注入。Laravel 服务容器会自动注入这些依赖。
注:二进制数据,如原生图片内容,在传递给队列任务之前先经过 base64_encode
方法处理,此外,该任务被推送到队列时将不会被序列化为 JSON 格式。
3、分发任务
创建好任务类后,就可以通过辅助函数 dispatch
将其分发到队列。辅助函数 dispatch
需要的唯一参数就是该任务的实例:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 保存新的播客.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
dispatch(new ProcessPodcast($podcast));
}
}
注:辅助函数 dispatch
是全局且易用的函数,而且非常容易测试。通过 Laravel 的测试文档可以了解更多细节。
延时分发
有时候你可能想要延迟队列任务的执行,可以通过在任务实例使用 delay
方法。该方法由Illuminate\Bus\Queueable
trait提供,已经自动添加在通过命令行生成的任务类中。例如你希望将某个任务在创建 10 分钟以后才执行:
<?php
namespace App\Http\Controllers;
use Carbon\Carbon;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 保存播客.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
$job = (new ProcessPodcast($pocast))
->delay(Carbon::now()->addMinutes(10));
dispatch($job);
}
}
注:Amazon SQS 的队列服务最长延时 15 分钟。
自定义队列和连接
分发到指定的队列
通过推送任务到不同队列,你可以将队列任务进行“分类”,甚至根据优先级来分配每个队列的进程数。请注意,这并不意味着使用了配置项中那些不同的连接来管理队列,实际上只有单一连接会被用到。要指定队列,请在任务实例使用 onQueue
方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 保存播客
*
* @param Request $request
* @return Response
* @translator laravelacademy.org
*/
public function store(Request $request)
{
// 创建播客...
$job = (new ProcessPodcast($podcast))->onQueue('processing');
dispatch($job);
}
}
分发到指定的连接
如果你使用了多个连接来管理队列,那么可以分发任务到指定的连接。请在任务实例中使用 onConnection
方法来指定连接:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 保存播客
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客
$job = (new ProcessPodcast($podcast))->onConnection('sqs');
dispatch($job);
}
}
当然,你可以同时使用 onConnection
和onQueue
方法来指定任务的连接和队列:
$job = (new ProcessPodcast($podcast))
->onConnection('sqs')
->onQueue('processing');
指定最大失败次数/超时时间
最大失败次数
指定队列任务最大失败次数的一种实现方式是通过 Artisan 命令 --tries
切换:
php artisan queue:work --tries=3
不过,你还可以在任务类自身定义最大失败次数来实现更加细粒度的控制,如果最大失败次数在任务中指定,则其优先级高于命令行指定的数值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}
超时
类似的,队列任务最大运行时长(秒)可以通过 Artisan 命令 --timeout
来指定:
php artisan queue:work --timeout=30
同样,你也可以在任务类中定义该任务允许运行的最大时长(单位:秒),任务中指定的超市时间优先级也高于命令行定义的数值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}
处理错误
如果任务在处理的时候有异常抛出,则该任务将会被自动释放回队列以便再次尝试执行。任务会持续被释放直到尝试次数达到应用允许的最大次数。最大尝试次数通过 Artisan 命令 queue:work
上的 --tries
开关来定义。此外,该次数也可以在任务类自身上定义。关于运行队列监听器的更多信息可以在下面看到。
4、运行队列进程
Laravel 包含了一个队列进程用来处理被推送到队列的新任务。你可以使用 queue:work
命令运行队列进程。请注意,队列进程开始运行后,会持续监听队列,直至你手动停止或关闭终端:
php artisan queue:work
注:为了保持队列进程 queue:work
持续在后台运行,需要使用进程守护程序,比如 Supervisor 来确保队列进程持续运行。
请记住,队列进程是长生命周期的进程,会在启动后驻留内存。若应用有任何改动将不会影响到已经启动的进程。所以请在发布程序后,重启队列进程。
指定连接和队列
队列进程同样可以自定义连接和队列。传递给 work
命令的连接名需要与 config/queue.php
中定义的某个连接相配置:
php artisan queue:work redis
你可以自定义将某个队列进程指定某个连接来管理。举例来说,如果所有的邮件任务都是通过 redis
连接上的 emails
队列处理,那么可以用以下命令来启动单一进程只处理单一队列:
php artisan queue:work redis --queue=emails
资源注意事项
后台队列进程不会再处理每个任务前重启框架,因此你需要在每次任务完成后释放所有重量级的资源。例如,如果你在使用 GD 库处理图片,需要在完成的时候使用 imagedestroy
来释放内存。
队列优先级
有时候你需要区分任务的优先级。比如,在配置文件config/queue.php
中,你可能定义 redis
连接的默认 queue
为 low
。 可是,偶尔需要将任务分发到高优先级 high
, 可以这么做:
dispatch((new Job)->onQueue('high'));
如果期望所有 high
高优先级的队列都将先于low
低优先级的任务执行,可以像这样启动队列进程:
php artisan queue:work --queue=high,low
队列进程 & 部署
前文已经提到队列进程是长生命周期的进程,在重启以前,所有源码的修改并不会对其产生影响。所以,最简单的方法是在每次发布新版本后重新启动队列进程。你可以通过 Aritisan 命令 queue:restart
来优雅地重启队列进程:
php artisan queue:restart
该命令将在队列进程完成正在进行的任务后,结束该进程,避免队列任务的丢失或错误。由于队列进程会在执行 queue:restart
命令后死掉,你仍然需要通过进程守护程序如 Supervisor 来自动重启队列进程。
任务过期和超时
任务执行时间
在配置文件 config/queue.php
中,每个连接都定义了 retry_after
项。该配置项的目的是定义任务在执行以后多少秒后释放回队列。如果retry_after
设定的值为 90
, 任务在运行 90
秒后还未完成,那么将被释放回队列而不是删除掉。毫无疑问,你需要把 retry_after
的值设定为任务执行时间的最大可能值。
注:只有 Amazon SQS 配置信息不包含 retry_after
项。Amazon SQS 的任务执行时间基于 Default Visibility Timeout ,该项在 Amazon AWS 控制台配置。
队列进程超时
队列进程 queue:work
可以设定超时 --timeout
项。该 --timeout
控制队列进程执行每个任务的最长时间,如果超时,该进程将被关闭。各种错误都可能导致某个任务处于“冻结”状态,比如 HTTP 无响应等。队列进程超时就是为了将这些“冻结”的进程关闭:
php artisan queue:work --timeout=60
配置项 retry_after
和 Aritisan 参数项 --timeout
不同,但目的都是为了确保任务的安全,并且只被成功的执行一次。
注:参数项--timeout
的值应该是中小于配置项retry_after
的值,这是为了确保队列进程总在任务重试以前关闭。如果--timeout
比retry_after
大,则你的任务可能被执行两次。
进行休眠时间
当任务在队列中有效时,进程会持续处理任务,没有延迟。不过,我们可以使用 sleep
配置项来指定没有新的有效任务产生时的休眠时间:
php artisan queue:work --sleep=3
5、配置 Supervisor
安装Supervisor
Supervisor 是 Linux 系统中常用的进程守护程序。如果队列进程 queue:work
意外关闭,它会自动重启启动队列进程。在 Ubuntu 安装Supervisor 非常简单:
sudo apt-get install supervisor
注:如果自己配置 Supervisor 有困难,可以考虑使用 Laravel Forge,它会为 Laravel 项目自动安装并配置 Supervisor。
配置Supervisor
Supervisor 配置文件通常存放在 /etc/supervisor/conf.d
目录,在该目录中,可以创建多个配置文件指示 Supervisor
如何监视进程,例如,让我们创建一个开启并监视queue:work
进程的 laravel-worker.conf
文件:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
在本例中,numprocs
指令让 Supervisor 运行 8 个 queue:work
进程并监视它们,如果失败的话自动重启。当然,你需要修改 queue:work sqs
的 command
指令来映射你的队列连接。
启动Supervisor
当你成功创建配置文件后,你需要刷新 Supervisor 的配置信息并使用如下命令启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
你可以通过 Supervisor 官方文档获的更多信息。
6、处理失败的任务
不可避免会出现失败的任务。不必担心,Laravel 很容易设置任务允许的最大尝试次数,若是执行次数达到该限定,任务会被插入到 failed_jobs
表, 要创建一个 failed_jobs
表的迁移,可以使用 queue:failed-table
命令
php artisan queue:failed-table
php artisan migrate
然后,运行队列进程时,通过 --tries
参数项来设置队列任务允许的最大尝试次数,如果没有指定 --tries
选项的值,任务会被无限期重试:
php artisan queue:work redis --tries=3
清理失败的任务
你可以在任务类中定义 failed
方法, 从而允许你在失败发生时执行指定的动作,比如发送任务失败的通知,记录日志等。导致任务失败的 Exception
会被传递到 failed
方法:
<?php
namespace App\Jobs;
use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* Create a new job instance.
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* Execute the job.
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// Process uploaded podcast...
}
/**
* The job failed to process.
*
* @param Exception $exception
* @return void
*/
public function failed(Exception $e)
{
// 发送失败通知, etc...
}
}
任务失败事件
如果你期望在任务失败的时候触发某个事件,可以使用 Queue::failing
方法。该事件通过邮件或HipChat通知团队。举个例子,我么可以在 Laravel 自带的AppServiceProvider
中附加一个回调到该事件:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;
class AppServiceProvider extends ServiceProvider
{
/**
* 启动应用服务.
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
/**
* 注册服务提供者.
*
* @return void
*/
public function register()
{
//
}
}
重试失败的任务
要查看已插入到 failed_jobs
数据表中的所有失败任务,可以使用 Artisan 命 queue:failed
:
php artisan queue:failed
该命令将会列出任务ID,连接,队列和失败时间,任务ID可用于重试失败任务,例如,要重试一个ID为5
的失败任务,要用到下面的命令
php artisan queue:retry 5
要重试所有失败任务,使用如下命令即可:
php artisan queue:retry all
如果你要删除一个失败任务,可以使用queue:forget
命令:
php artisan queue:forget 5
要删除所有失败任务,可以使用queue:flush
命令
php artisan queue:flush
7、任务事件
当你使用 Queue
门面的时候,可以使用 before
和 after
方法。你可以自定义在任务开始前或者结束后执行某个回调。这些回调可用来记录日志或者记录统计数据。通常,你可以在服务提供者中使用这些方法。比如,我们可能在AppServiceProvider
这样用:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* Bootstrap any application services.
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
/**
* Register the service provider.
*
* @return void
*/
public function register()
{
//
}
}
使用 Queue
门面上的 looping
方法,你可以在进程尝试从队列中获取任务之前指定要执行的回调。例如,你可以注册一个闭包来回调之前失败任务遗留下来的事务:
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});
14 Comments
$job = (new DelayCommand()) ->delay(Carbon::now()->addMinutes(10)); dispatch($job);