队列
简介
注:Laravel 现在提供了基于 Redis 的,拥有美观的后台和配置系统的 Horizon 队列扩展包,完整信息参考 Horizon文档。
Laravel 队列为不同的后台队列服务提供了统一的 API,例如 Beanstalk,Amazon SQS,Redis,甚至其他基于关系型数据库的队列。队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和响应的时间。
队列配置文件存放在 config/queue.php
。每一种队列驱动的配置都可以在该文件中找到,包括数据库、Beanstalkd、Amazon SQS、Redis以及同步(本地使用)驱动。其中还包含了一个 null
队列驱动用于那些放弃队列的任务。
连接 Vs. 队列
在开始使用 Laravel 队列以前,了解“连接”和“队列”的关系非常重要。在配置文件 config/queue.php
有一个 connections
配置项。该配置项定义了后台队列服务的特定连接,如 Amazon SQS、Beanstalk 或 Redis。每种队列连接都可以有很多队列,可以想象在银行办理现金业务的各个窗口队列。
请注意 queue
配置文件中的每个连接配置示例都有一个 queue
属性。当新的队列任务被添加到指定的连接时,该配置项的值就是默认监听的队列(名称)。换种说法,如果你没有指派特别的队列名称,那么 queue
的值,也是该任务默认添加到的队列(名称):
// 以下的任务将被委派到默认队列...
Job::dispatch();
// 以下任务将被委派到 "emails" 队列...
Job::dispatch()->onQueue('emails');
有些应用并不需要将任务分配到多个队列,单个队列已经非常适用。但是,应用的任务有优先级差异或者类别差异的时候,推送任务到多个队列将是更好地选择,因为 Laravel 的队列进程支持通过优先级指定处理的队列。举个例子,你可以将高优先级的任务委派到 high
(高优先级)队列,从而让它优先执行。
php artisan queue:work --queue=high,default
驱动预备知识
数据库
要使用 database
队列驱动,你需要数据表保存任务信息。要生成创建这些表的迁移,可以运行 Artisan 命令 queue:table
,迁移被创建之后,可以使用 migrate
命令生成这些表:
php artisan queue:table
php artisan migrate
Redis
要使用 redis
队列驱动,需要在配置文件 config/database.php
中配置 Redis 数据库连接。
Redis 集群
如果 Redis 队列连接使用 Redis Cluster(集群),队列名称必须包含 key hash tag,以确保给定队列对应的所有 Redis keys 都存放到同一个 hash slot:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
学院君注:对一般中小型应用推荐使用 Redis 作为队列驱动。
阻塞
使用 Redis 队列时,可以使用 block_for
配置项来指定驱动在迭代队列进程循环并重新轮询 Redis 数据库之前等待可用队列任务的时间。
根据队列负载来调整此配置值会比轮询 Redis 数据库来查找新任务更加高效。例如,你可以设置该值为 5 来告诉驱动在等待可用队列任务时需要阻塞五秒:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'retry_after' => 90,
'block_for' => 5,
],
注:将
block_for
设置为0
会导致队列进程一直阻塞直到出现待执行的有效任务,同时也能防止执行下一个任务前类似SIGTERM
这种程序中断信号被处理。
其他驱动
如果使用以下几种队列驱动,需要安装相应的依赖:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~4.0
- Redis:
predis/predis ~1.0
或者 PHP 扩展phpredis
。
创建任务
生成任务类
通常,所有的任务类都保存在 app/Jobs
目录。如果 app/Jobs
不存在,在运行 Artisan 命令 make:job
的时候,它将会自动创建。你可以通过 Artisan CLI 来生成队列任务类:
php artisan make:job ProcessPodcast
生成的类都实现了 Illuminate\Contracts\Queue\ShouldQueue
接口,这是一个空接口,用于告知 Laravel 将该任务推送到队列,而不是立即运行。
注:任务类桩文件可以通过桩发布来自定义。
任务类结构
任务类非常简单,通常只包含处理该任务的 handle
方法,让我们看一个任务类的例子。在这个例子中,我们模拟管理播客发布服务,并在发布以前上传相应的播客文件:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建任务实例
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 执行任务
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 处理上传的播客…
}
}
在本示例中,我们将 Eloquent 模型作为参数直接传递到构造函数。因为该任务使用了 SerializesModels
trait,Eloquent 模型将会在任务被执行时优雅地序列化和反序列化。如果你的队列任务在构造函数中接收 Eloquent 模型,只有模型的主键会被序列化到队列,当任务真正被执行的时候,队列系统会自动从数据库中获取整个模型实例。这对应用而言是完全透明的,从而避免序列化整个 Eloquent 模型实例引起的问题。
handle
方法在任务被处理的时候调用,注意我们可以在任务的 handle
方法中进行依赖注入。Laravel 服务容器会自动注入这些依赖。
如果你想要完全控制容器如何将依赖注入到 handle
方法,可以使用容器的 bindMethod
方法。bindMethod
方法接收一个回调,该回调支持传入任务和容器实例,在这个回调中,你可以随意调用 handle
方法。通常,我们在某个服务提供者中调用这个方法:
use App\Jobs\ProcessPodcast;
$this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) {
return $job->handle($app->make(AudioProcessor::class));
});
注:二进制数据,如原生图片内容,在传递给队列任务之前先经过
base64_encode
方法处理,此外,该任务被推送到队列时将不会被序列化为 JSON 格式。
处理关联关系
由于被加载的关联模型也会被序列化,这可能会导致序列化后的任务字符串变得很大,要阻止关联模型一起被序列化,可以在设置属性值时调用模型类上的 withoutRelations
方法,该方法会返回一个不包含关联关系的模型实例:
/**
* Create a new job instance.
*
* @param \App\Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast->withoutRelations();
}
任务中间件
任务中间件允许你在执行队列任务前封装一些自定义逻辑,从而减少任务本身的模板代码量。例如,考虑下面这个 handle
方法,它使用了 Laravel 的 Redis 限制频率功能,从而限定每五秒钟只能处理一个任务:
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
// Handle job...
}, function () {
// Could not obtain lock...
return $this->release(5);
});
}
尽管这段代码是有效的,但是由于混入了 Redis 频率限制逻辑,handle
方法变得很臃肿,此外,这个频率限制逻辑在其它需要限制频率的任务中也要重复编写。
我们可以通过定义任务中间件来处理频率限制,而不是在队列任务处理方法中完成。Laravel 没有默认的位置来存放任务中间件,因此,你可以将它们存放在任何地方。在本例中,我们将其存放在 app/Jobs/Middleware
目录下:
<?php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* Process the queued job.
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...
$next($job);
}, function () use ($job) {
// Could not obtain lock...
$job->release(5);
});
}
}
正如你所看到的,和路由中间件一样,任务中间件接收待处理的任务以及一个回调作为参数,该回调会在中间件逻辑处理完毕后继续处理任务。
创建好任务中间件后,可以在任务类的 middleware
方法中将其添加到对应任务中,该方法在执行 Artisan 命令 make:job
时默认没有自动生成,所以你需要手动编写该方法实现代码:
use App\Jobs\Middleware\RateLimited;
/**
* Get the middlewarwe the job should pass through.
*
* @return array
*/
public function middleware()
{
return [new RateLimited];
}
分发任务
创建好任务类后,就可以通过任务自身的 dispatch
方法将其分发到队列。dispatch
方法需要的唯一参数就是该任务的实例:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...
ProcessPodcast::dispatch($podcast);
}
}
如果你想要按条件分发任务,可以使用 dispatchIf
和 dispatchUnless
方法:
ProcessPodcast::dispatchIf($accountActive = true, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended = false, $podcast);
延时分发
有时候你可能想要延迟队列任务的执行,这可以通过在分发任务时使用 delay
方法实现。例如你希望将某个任务在创建 10 分钟以后才执行:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
}
}
注:Amazon SQS 的队列服务最长延时 15 分钟。
响应发送到客户端之后分发
此外,还可以使用 dispatchAfterResponse
方法在响应发送给用户浏览器之后再分发任务,这样一来,即使队列任务在执行的情况下,用户仍然继续使用应用,这种场景适用于任务耗时较长的情况,比如发送邮件:
use App\Jobs\SendNotification;
SendNotification::dispatchAfterResponse();
你也可以通过传入闭包到 dispatch
方法,再链接 afterResponse
方法的方式实现上述同样的效果,只是这个时候,需要把待执行的队列任务代码放到闭包中实现:
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
dispatch(function () {
Mail::to('taylor@laravel.com')->send(new WelcomeMessage);
})->afterResponse();
同步分发
如果你立即想要分发任务(同步),可以使用 dispatchNow
方法。使用这个方法时,对应任务不会被推送到队列,而是立即在当前进程中运行:
<?php
namespace App\Http\Controllers;
use Illuminate\Http\Request;
use App\Jobs\ProcessPodcast;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...
ProcessPodcast::dispatchNow($podcast);
}
}
任务链
任务链允许你指定一个需要在一个序列中执行的队列任务列表,如果序列中的某个任务失败,其它任务将不再运行。要执行一个队列任务链,可以使用 Bus
门面提供的 chain
方法:
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();
除了链接任务类实例,还可以链接闭包实现的任务:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(...);
},
])->dispatch();
注:使用
$this->delete()
方法删除任务不会阻断正在被处理的任务链中的任务。任务链中的任务只有在执行失败时才会停止执行。
链接连接 & 队列
如果你想要指定任务链使用的默认连接和队列,可以使用 onConnection
和 onQueue
方法。这些方法指定需要用到的队列连接和队列名称,除非队列任务显式指定了分配的连接/队列:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();
失败处理
链接队列任务时,你可以使用 catch
方法指定一个任务链中任务执行失败时调用的闭包,该闭包回调接收导致任务失败的异常实例作为参数:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// A job within the chain has failed...
})->dispatch();
自定义队列 & 连接
分发到指定的队列
通过推送任务到不同队列,你可以将队列任务进行“分类”,甚至根据优先级来分配每个队列的进程数。请注意,这并不意味着使用了配置项中那些不同的连接来管理队列,实际上只有单一连接会被用到。要指定队列,请在任务实例使用 onQueue
方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
}
}
分发到指定的连接
如果你使用了多个连接来管理队列,那么可以分发任务到指定的连接。请在任务实例中使用 onConnection
方法来指定连接:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
}
}
当然,你可以同时使用 onConnection
和onQueue
方法来指定任务的连接和队列:
$job = (new ProcessPodcast($podcast))
->onConnection('sqs')
->onQueue('processing');
指定最大失败次数/超时时间
最大失败次数
指定队列任务最大失败次数的一种实现方式是通过 Artisan 命令 --tries
切换:
php artisan queue:work --tries=3
不过,你还可以在任务类自身定义最大失败次数来实现更加细粒度的控制,如果最大失败次数在任务中指定,则其优先级高于命令行指定的数值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}
基于时间的尝试次数
除了定义在任务失败前的最大尝试次数外,还可以定义在指定时间内允许任务的最大尝试次数,这可以通过在任务类中添加 retryUntil
方法来实现:
/**
* Determine the time at which the job should timeout.
*
* @return \DateTime
*/
public function retryUntil()
{
return now()->addSeconds(5);
}
注:还可以在队列时间监听器中定义
retryUntil
方法。
最大异常数
有时候你可能希望指定某个任务尝试多次,但是如果触发指定次数异常后失败退出,要实现这个功能,可以在任务类中定义 maxExceptions
属性:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 25;
/**
* The maximum number of exceptions to allow before failing.
*
* @var int
*/
public $maxExceptions = 3;
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Lock obtained, process the podcast...
}, function () {
// Unable to obtain lock...
return $this->release(10);
});
}
}
在这个例子中,如果应用无法获取 Redis 锁,会释放 10s,再重试,重试上限是 25 次,不过,如果期间 3 次抛出未处理异常,则任务失败退出。
超时
注:
timeout
方法为 PHP 7.1+ 和pcntl
扩展做了优化。
类似的,队列任务最大运行时长(秒)可以通过 Artisan 命令上的 --timeout
开关来指定:
php artisan queue:work --timeout=30
同样,你也可以在任务类中定义该任务允许运行的最大时长(单位:秒),任务中指定的超时时间优先级也高于命令行定义的数值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}
有时候,IO 阻塞进程(例如网络套接字或者对外的 HTTP 连接)可能不会遵守你指定的超时时间,因此,使用这些功能时,还需要同时使用对应进程程序提供的 API 指定超时时间。例如,使用 Guzzle 时,需要在 Guzzle 实例上指定连接和请求的超时时间。
频率限制
注:该功能要求应用可以与 Redis 服务器进行交互。
如果应用使用了 Redis,那么可以使用时间或并发来控制队列任务。该功能特性在队列任务与有频率限制的 API 交互时很有帮助。
例如,通过 throttle
方法,你可以限定给定类型任务每 60 秒只运行 10 次。如果不能获取锁,需要将任务释放回队列以便可以再次执行:
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...
return $this->release(10);
});
注:在上面的例子中,
key
可以是任意可以唯一标识你想要限定访问频率的任务类型的字符串。举个例子,这个键可以基于任务类名和操作 Eloquent 模型的 ID 进行构建。
注:将受限制的任务释放回队列依然会增加任务的总执行次数
attempts
的值。
除此之外,还可以指定可以同时处理给定任务的最大进程数量。这个功能在队列任务正在编辑一次只能由一个任务进行处理的资源时很有用。例如,使用 funnel
方法你可以给定类型任务一次只能由一个工作进程进行处理:
Redis::funnel('key')->limit(1)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...
return $this->release(10);
});
注:使用频率限制时,任务在运行成功之前需要的最大尝试次数很难权衡,因此,将频率限制和基于时间的尝试次数结合起来使用是个不错的选择。
处理错误
如果任务在处理的时候有异常抛出,则该任务将会被自动释放回队列以便再次尝试执行。任务会持续被释放直到尝试次数达到应用允许的最大次数。最大尝试次数通过 Artisan 命令 queue:work
上的 --tries
开关来定义。此外,该次数也可以在任务类自身上定义。关于运行队列监听器的更多信息可以在下面看到。
任务批处理
Laravel 的队列任务批处理功能让开发者运行批量任务以及在批量任务运行完成后执行一些特定动作变得简单。开始使用这个功能之前,需要通过数据库迁移创建相关的数据表来存放任务批处理元信息,这个数据库迁移文件可以通过内置的 Artisan 命令 queue:batches-table
生成:
php artisan queue:batches-table
php artisan migrate
定义批处理任务
要构建批处理任务,首先需要正常创建一个队列任务,不过,这个任务类需要使用 Illuminate\Bus\Batchable
Trait,该 Trait 中包含了用于获取当前任务所在运行批次的 batch
方法:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
if ($this->batch()->cancelled()) {
// Detected cancelled batch...
return;
}
// Batched job executing...
}
}
分发批处理
要分发一个任务批处理,需要使用 Bus
门面的 batch
方法。当然,批处理在与完成回调结合时才能发挥最大作用,因此,你可以使用 then
、catch
和 finally
方法为批处理定义完成回调。每个回调在被调用时都会获取一个 Illuminate\Bus\Batch
实例作为参数传入:
use App\Jobs\ProcessPodcast;
use App\Podcast;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Batch;
use Throwable;
$batch = Bus::batch([
new ProcessPodcast(Podcast::find(1)),
new ProcessPodcast(Podcast::find(2)),
new ProcessPodcast(Podcast::find(3)),
new ProcessPodcast(Podcast::find(4)),
new ProcessPodcast(Podcast::find(5)),
])->then(function (Batch $batch) {
// 所有任务执行成功...
})->catch(function (Batch $batch, Throwable $e) {
// 第一个批处理任务执行失败...
})->finally(function (Batch $batch) {
// 批处理执行完成(不管有没有失败都会到这里)...
})->dispatch();
return $batch->id;
命名批处理
如果批处理有名字的话,有些工具比如 Laravel Horizon 和 Laravel Telescope 可以为批处理提供更加用户友好的调试信息,要为批处理设置名字,可以在定义批处理时调用 name
方法实现:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有批处理任务执行成功...
})->name('Process Podcasts')->dispatch();
批处理连接 & 队列
如果你想要指定用于批处理任务的连接和队列,可以使用 onConnection
和 onQueue
方法:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->onConnection('redis')->onQueue('podcasts')->dispatch();
添加任务到批处理
有时候在批处理任务中添加额外任务到批处理可能很有用,尤其是当你需要批处理成千上万个队列任务时,需要在 Web 请求期间耗费很长时间进行分发操作。在这种情况下,你可能希望先分发一个初始化的任务加载器批处理,这个加载器后续可以支持水合更多其它队列任务到这个批处理:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import Contacts')->dispatch();
在这个例子中,我们可以使用 LoadImportBatch
任务来添加其它任务到批处理,要实现这个功能,还需要在队列任务中调用批处理实例上的 add
方法完成添加:
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
注:你只能在队列任务中将其它任务添加到当前任务所属的批处理。
检查批处理
传递到批处理完成回调的 Illuminate\Bus\Batch
实例提供了很多属性和方法来帮助你与给定任务批处理进行交互以及对批处理进行检查:
// 批处理的 UUID...
$batch->id;
// 批处理的名字 (如果设置过的话)...
$batch->name;
// 分配给批处理的任务数量...
$batch->totalJobs;
// 尚未被队列处理的任务数量...
$batch->pendingJobs;
// 执行失败的任务数量...
$batch->failedJobs;
// 当前已处理的任务数量...
$batch->processedJobs();
// 批处理的完成百分比 (0-100)...
$batch->progress();
// 批处理是否已经完成运行...
$batch->finished();
// 取消批处理的执行...
$batch->cancel();
// 批处理是否已经取消...
$batch->cancelled();
从路由返回批处理
所有 Illuminate\Bus\Batch
实例均可 JSON 序列化,这意味着你可以直接从应用路由中返回它们,得到的结果是 JSON 格式的批处理信息,其中还包含了批处理的完成进度。如果要通过 ID 获取对应批处理,可以使用 Bus
门面提供的 findBatch
方法实现:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});
取消批处理
有时候你可能需要取消指定批处理的执行,这可以通过调用 Illuminate\Bus\Batch
实例上的 cancel
方法来完成:
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}
if ($this->batch()->cancelled()) {
return;
}
}
批处理失败
当一个批处理任务运行失败,会调用 catch
回调(如果定义了的话)。该回调只有在批处理中的任务运行失败才会调用。
允许失败
当批处理中的任务运行失败,Laravel 会自动将对应批处理标记为「已取消」。如果需要的话,你可以取消这个行为,这样一来,任务运行失败后就不会自动将批处理标记为已取消,这可以在分发批处理时通过调用 allowFailures
方法来实现:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->allowFailures()->dispatch();
重试失败的批处理任务
为了方便操作,Laravel 提供了一个 Artisan 命令 queue:retry-batch
用于重试给定批处理下的所有失败任务,你可以通过传递批处理的 UUID 来指定对应的批处理:
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
队列闭包
除了将任务类推送到队列之外,还可以推送闭包到队列。这对于需要在当前请求生命周期之外执行的简单快捷的任务来说非常方便。推送闭包到队列时,闭包的代码内容以加密方式签名,所以不会在传输过程中被篡改:
$podcast = App\Models\Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});
你可以通过 catch
方法提供一个闭包函数,该闭包函数会在队列闭包执行失败(配置的队列任务重试次数都用完了仍然没有成功)时调用:
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// 队列任务执行失败...
});
运行队列进程
Laravel 自带了一个队列进程用来处理被推送到队列的新任务。你可以使用 queue:work
命令运行这个队列进程。请注意,队列进程开始运行后,会持续监听队列,直至你手动停止或关闭终端:
php artisan queue:work
注:为了保持队列进程
queue:work
持续在后台运行,需要使用进程守护程序,比如 Supervisor 来确保队列进程持续运行。
请记住,队列进程是长生命周期的进程,会在启动后驻留内存。若应用有任何改动将不会影响到已经启动的进程。所以请在发布程序后,重启队列进程。
此外,你还可以运行 queue:listen
命令。使用该命令时,代码修改后不需要手动重启队列进程,不过,该命令性能不及 queue:work
:
php artisan queue:listen
指定连接 & 队列
队列进程同样可以自定义连接和队列。传递给 work
命令的连接名需要与配置文件 config/queue.php
中定义的某个连接配置相匹配:
php artisan queue:work redis
你可以自定义将某个队列进程指定某个连接来管理。举例来说,如果所有的邮件任务都是通过 redis
连接上的 emails
队列处理,那么可以用以下命令来启动单一进程只处理单一队列:
php artisan queue:work redis --queue=emails
处理指定数量的任务
--once
选项可用于告知进程只处理队列中的单个任务:
php artisan queue:work --once
--max-jobs
选项用于告知进程处理给定数量的任务然后退出。该选项在与 Supervisor 结合时很有用,因为队列进程可以在处理完指定数量的任务后重新启动:
php artisan queue:work --max-jobs=1000
处理所有队列任务然后退出
--stop-when-empty
选项可用于告知进程处理所有任务然后优雅退出。当我们在 Docker 容器中处理 Laravel 队列时,如果你想要在队列为空时关闭容器,则该选项很有用:
php artisan queue:work --stop-when-empty
在给定时间内处理任务
--max-time
选项可用于告知队列进程在给定时间内处理任务然后退出。该选项在与 Supervisor 结合时很有用,因为队列进程可以在给定时间内的处理任务后重新启动:
// 处理任务 1 小时然后退出...
php artisan queue:work --max-time=3600
资源注意事项
后台队列进程不会在处理每个任务前重启框架,因此你需要在每次任务完成后释放所有重量级的资源。例如,如果你在使用 GD 库处理图片,需要在完成的时候使用 imagedestroy
来释放内存。
队列优先级
有时候你需要区分任务的优先级。比如,在配置文件 config/queue.php
中,你可以定义 redis
连接的默认 queue
为 low
。不过,如果需要将任务分发到高优先级 high
,可以这么做:
dispatch((new Job)->onQueue('high'));
如果期望所有 high
高优先级的队列都将先于 low
低优先级的任务执行,可以像这样启动队列进程:
php artisan queue:work --queue=high,low
队列进程 & 部署
前文已经提到队列进程是长生命周期的进程,在重启以前,所有源码的修改并不会对其产生影响。所以,最简单的方法是在每次发布新版本后重新启动队列进程。你可以通过 Aritisan 命令 queue:restart
来优雅地重启队列进程:
php artisan queue:restart
该命令将在队列进程完成正在进行的任务后,结束该进程,避免队列任务的丢失或错误。由于队列进程会在执行 queue:restart
命令后死掉,你仍然需要通过进程守护程序如Supervisor 来自动重启队列进程。
注:队列使用缓存来存储重启信号,所以在使用此功能前你需要验证缓存驱动配置正确。
任务过期 & 超时
任务过期
在配置文件 config/queue.php
中,每个连接都定义了 retry_after
项。该配置项的目的是定义任务在执行以后多少秒后释放回队列。如果retry_after
设定的值为 90
, 任务在运行 90
秒后还未完成,那么将被释放回队列而不是删除掉。毫无疑问,你需要把 retry_after
的值设定为任务执行时间的最大可能值。
注:只有 Amazon SQS 配置信息不包含
retry_after
项。Amazon SQS 的任务执行时间基于 Default Visibility Timeout ,该项在 Amazon AWS 控制台配置。
队列进程超时
队列进程 queue:work
可以设定超时 --timeout
项。该 --timeout
控制队列进程执行每个任务的最长时间,如果超时,该进程将被关闭。各种错误都可能导致某个任务处于“冻结”状态,比如 HTTP 无响应等。队列进程超时就是为了将这些“冻结”的进程关闭:
php artisan queue:work --timeout=60
配置项 retry_after
和 Aritisan 参数项 --timeout
不同,但目的都是为了确保任务的安全,并且只被成功的执行一次。
注:参数项
--timeout
的值应该始终小于配置项retry_after
的值,这是为了确保队列进程总在任务重试以前关闭。如果--timeout
比retry_after
大,那么你的任务可能被执行两次。
进程休眠时间
当任务在队列中有效时,进程会持续处理任务,没有延迟。不过,我们可以使用 sleep
配置项来指定没有新的有效任务产生时的休眠时间。休眠期间,队列进程不会处理任何新任务直到队列进程醒来:
php artisan queue:work --sleep=3
配置 Supervisor
安装 Supervisor
Supervisor 是 Linux 系统中常用的进程守护程序。如果队列进程 queue:work
意外关闭,它会自动重启启动队列进程。在 Ubuntu 安装Supervisor 非常简单:
sudo apt-get install supervisor
注:如果自己配置 Supervisor 有困难,可以考虑使用 Laravel Forge,它会为 Laravel 项目自动安装并配置 Supervisor。
配置 Supervisor
Supervisor 配置文件通常存放在 /etc/supervisor/conf.d
目录,在该目录下,可以创建多个配置文件指示 Supervisor
如何监视进程,例如,让我们创建一个开启并监视 queue:work
进程的 laravel-worker.conf
文件:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/project/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600
注:你需要将
/path/to/project
替换成自己项目在服务器的绝对地址。
在本例中,numprocs
指令让 Supervisor 运行 8 个 queue:work
进程并监视它们,如果失败的话自动重启。当然,你需要修改 queue:work sqs
的 command
指令来映射你的队列连接。
注:你需要确保
stopwaitsecs
的值大于最长运行任务的秒数,否则的话,Supervisor 会在该任务处理完成前杀死对应的任务进程。
启动 Supervisor
当成功创建配置文件后,需要刷新 Supervisor 的配置信息并使用如下命令启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
你可以通过 Supervisor 官方文档获取更多信息。
处理失败的任务
不可避免会出现运行失败的任务。你不必为此担心,Laravel 可以轻松设置任务允许的最大尝试次数,若是执行次数达到该限定,该任务会被插入到 failed_jobs
表,要创建一个 failed_jobs
表的迁移,可以使用 queue:failed-table
命令
php artisan queue:failed-table
php artisan migrate
然后,运行队列进程时,通过 --tries
参数项来设置队列任务允许的最大尝试次数,如果没有指定 --tries
选项的值,任务会被无限期重试:
php artisan queue:work redis --tries=3
此外,你还可以通过 --backoff
选项指定重试失败任务之前需要等待多少秒,默认情况下,失败队列任务会立即重试:
php artisan queue:work redis --tries=3 --backoff=3
如果你想要为指定任务配置失败重试延迟时间,可以在对应的队列任务类中定义 backoff
属性:
/**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $backoff = 3;
如果你需要更复杂的逻辑来判断重试延迟,可以在队列任务类中定义一个 backoff
方法:
/**
* Calculate the number of seconds to wait before retrying the job.
*
* @return int
*/
public function backoff()
{
return 3;
}
你可以通过从 backoff
方法返回一个数组来配置「跳跃式」的重试延迟时间。在这个例子中,重试延迟第一次是 1s,第二次是 5s,第三次是 10s:
/**
* Calculate the number of seconds to wait before retrying the job.
*
* @return array
*/
public function backoff()
{
return [1, 5, 10];
}
清理失败的任务
你可以在任务类中定义 failed
方法, 从而允许你在失败发生时执行指定的动作,比如发送任务失败的通知,记录日志等。导致任务失败的 Exception
会被传递到 failed
方法:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* Create a new job instance.
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* Execute the job.
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// Process uploaded podcast...
}
/**
* The job failed to process.
*
* @param Throwable $exception
* @return void
*/
public function failed(Throwable $exception)
{
// 发送失败通知, etc...
}
}
任务失败事件
如果你期望在任务失败的时候触发某个事件,可以使用 Queue::failing
方法。该事件通过邮件或Slack通知团队。举个例子,我么可以在 Laravel 自带的 AppServiceProvider
中添加一个回调到该事件:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;
class AppServiceProvider extends ServiceProvider
{
/**
* 启动应用服务.
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
/**
* 注册服务提供者.
*
* @return void
*/
public function register()
{
//
}
}
重试失败的任务
要查看已插入到 failed_jobs
数据表中的所有失败任务,可以使用 Artisan 命 queue:failed
:
php artisan queue:failed
该命令将会列出任务 ID、连接、队列和失败时间,任务 ID 可用于重试失败任务,例如,要重试一个 ID 为 5
的失败任务,可以运行下面的命令:
php artisan queue:retry 5
如果需要的话,还可以传递多个 ID 或者 ID 区间(数字 ID)到这个命令:
php artisan queue:retry 5 6 7 8 9 10
php artisan queue:retry --range=5-10
要重试所有失败任务,传递 all
作为 ID 字段到 queue:retry
命令即可:
php artisan queue:retry all
如果你要删除一个失败任务,可以使用 queue:forget
命令:
php artisan queue:forget 5
要删除所有失败任务,可以使用 queue:flush
命令:
php artisan queue:flush
忽略缺失的模型
当注入一个 Eloquent 模型到队列任务时,它会在被放到队列之前自动序列化,然后在任务被处理时恢复。不过,如果该模型实例在任务等待被处理期间被删除,对应任务在执行时会失败并抛出 ModelNotFoundException
异常。
为了方便起见,你可以通过设置队列任务的 deleteWhenMissingModels
属性为 true
来选择自动删除缺失模型实例的任务:
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
任务事件
通过 Queue
门面提供的 before
和 after
方法可以在任务被处理之前或之后指定要执行的回调。这些回调可用来记录日志或者记录统计数据。通常,你可以在服务提供者中使用这些方法。比如,我们可以在AppServiceProvider
中这样用:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* Bootstrap any application services.
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
/**
* Register the service provider.
*
* @return void
*/
public function register()
{
//
}
}
使用 Queue
门面上的 looping
方法,你可以在进程尝试从队列中获取任务之前指定要执行的回调。例如,你可以注册一个闭包来回滚之前失败任务遗留下来的事务:
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});
5 条评论
内容有点多, 看了一个下午,有些蒙~~
可以结合这个系列看:https://laravelacademy.org/books/laravel-queue-action
👌
请问下 如果一个项目A是laravel的队列 另一些项目是非Laravel或laravel的项目, 如果要获取到laravel项目A队列 1. 队列的字符串前缀还好说 那么 队列中的数据结构呢? laravel队列的数据结构太复杂 本来就在队列中放个简单的数据 如字符串或数字的, 结果你个傻逼laravel给我整了一大堆没用的复杂的数据结构在队列里面 , 在另一些项目我还得去费脑子去把你队列的数据结构搞清楚 2. 如果我的另一些项目想往laravel项目中的队列去放数据 还是得搞懂你的队列前缀 以及封装的数据结构 在处理方法handle里面才能处理 你个傻逼玩意过度封装 3. 如果我的另一个项目也是laravel的想往Laravel项目A的放队列放数据 最简的方式还得把laravel队列的类放到另一个laravel项目中 然后把handle方法中的实现整成空方法 再来触发 队列类的dispatch方法
内容这样铺排下来,看得好困,都是一些伪代码,懵逼中