基于 Swoole 在 Laravel 中实现异步任务队列
Swoole 异步任务
实现原理
我们知道,PHP 本身的设计是同步阻塞的,不支持多线程和异步 IO,所以当我们执行一些耗时的操作,比如发送广播,或者邮件,如果直接在当前进程中操作,会导致服务器响应变慢,因此要借助一些第三方服务来处理以实现异步功能,比如队列,而 Swoole 作为 PHP 异步网络通信引擎,自然也对异步任务处理提供了支持,其底层的实现原理和常见的异步队列类似:将耗时任务投递到 TaskWorker 进程池后返回(相应任务会通过 TaskWorker 异步执行,执行成功后可以调用事先注册的回调函数进行后续处理),继续后续业务逻辑的执行,而不影响当前请求的处理速度。关于 TaskWorker 后面我们介绍 Swoole 底层原理的时候还会详细介绍。
示例代码
我们可以基于 Swoole 入门教程中编写的 TCP 服务器为基础来实现一个异步任务服务器用来处理异步任务,只需添加一个任务处理和一个任务完成回调即可,此外,还需要配置 TaskWorker 进程数以保证任务处理的速度,可以根据任务的耗时和任务量配置其值,通常我们可以将其配置为 CPU 数量的两倍:
<?php
$server = new \Swoole\Server("127.0.0.1", 9503);
// 设置异步任务的工作进程数量
$server->set(array('task_worker_num' => 4));
//收到请求时触发
$server->on('receive', function(\Swoole\Server $server, $fd, $from_id, $data) {
//投递异步任务
$task_id = $server->task($data);
echo "异步任务投递成功: id=$task_id\n";
$server->send($fd, "数据已接收,处理中...");
});
// 处理异步任务
$server->on('task', function (\Swoole\Server $server, $task_id, $from_id, $data) {
echo "新的待处理异步任务[id=$task_id]".PHP_EOL;
// todo 处理异步任务
// 返回任务执行的结果
$server->finish("$data -> OK");
});
// 处理异步任务的结果
$server->on('finish', function (\Swoole\Server $server, $task_id, $data) {
echo "异步任务[$task_id] 处理完成: $data".PHP_EOL;
});
$server->start();
我们将该文件保存为 async_task_server.php
,然后在命令行启动这个服务器:
php async_task_server.php
接下来,启动在 Swoole 入门教程中编写的 TCP 客户端:
php tcp_client.php
服务器投递任务到 TaskWorker 后会立即将处理信息发送给客户端:
然后服务器会异步处理任务:
在 Laravel 中基于 Swoole 实现异步任务队列
还是以 LaravelS 扩展包为基础,我们在 Laravel 项目中实现基于 Swoole 实现异步任务队列功能,其原理就是上面介绍的 Swoole 异步任务,只不过该扩展包对其做了封装而已。
编写任务类
首先我们创建一个继承自 Hhxsv5\LaravelS\Swoole\Task\Task
基类的待处理任务类 TestTask
:
<?php
namespace App\Jobs;
use Hhxsv5\LaravelS\Swoole\Task\Task;
use Illuminate\Support\Facades\Log;
class TestTask extends Task
{
// 待处理任务数据
private $data;
// 任务处理结果
private $result;
public function __construct($data)
{
$this->data = $data;
}
// 任务投递调用 task 回调时触发,等同于 Swoole 中的 onTask 逻辑
public function handle()
{
Log::info(__CLASS__ . ': 开始处理任务', [$this->data]);
// todo 耗时任务具体处理逻辑在这里编写
sleep(3); // 模拟任务需要3秒才能执行完毕
$this->result = 'The result of ' . $this->data . ' is balabalabala';
}
// 任务完成调用 finish 回调时触发,等同于 Swoole 中的 onFinish 逻辑
public function finish()
{
Log::info(__CLASS__ . ': 任务处理完成', [$this->result]);
// 可以在这里触发后续要执行的任务,或者执行其他善后逻辑
}
}
编写测试代码
然后在 routes/web.php
编写投递异步任务的测试代码如下:
Route::get('/task/test', function () {
$task = new \App\Jobs\TestTask('测试异步任务');
$success = \Hhxsv5\LaravelS\Swoole\Task\Task::deliver($task); // 异步投递任务,触发调用任务类的 handle 方法
var_dump($success);
});
修改配置文件
此外还要在配置文件 config/laravels.php
中取消 task_worker_num
配置项前面的注释:
'swoole' => [
...
'task_worker_num' => function_exists('swoole_cpu_num') ? swoole_cpu_num() * 2 : 8,
...
]
测试异步任务执行
接下来,我们重启启动 Swoole 服务器(基于 Swoole HTTP 服务器访问路由才能成功投递异步任务):
php bin/laravels restart
然后在浏览器中通过 http://todo-s.test/task/test
访问测试路由,页面立即显示投递成功:
bool(true)
然后我们去 storage/logs
目录下查看最新的日志信息,可以看到任务执行其实耗费了 3 秒:
[2019-05-30 16:31:32] local.INFO: App\Jobs\TestTask: 开始处理任务 ["测试异步任务"]
[2019-05-30 16:31:35] local.INFO: App\Jobs\TestTask: 任务处理完成 ["The result of 测试异步任务 is balabalabala"]
这样,我们就成功在 Laravel 项目中基于 Swoole 实现了异步任务消费队列,很简单吧。
22 Comments
按例子nginx访问这个路由时候显示Class swoole does not exist。是需要nginx将请求转发到swoole吗?不太懂,望赐教~
嗯 是的
还有一个问题请赐教下。。研究了一整天了不太明白,就是启动swoole以后,nginx将请求转发给swoole_server了。控制器里我定义了一个属性 public $msg,我发现在控制器里请求$this->msg赋值的信息,在第二次请求的时候还在。。附上代码
//第一次请求打印为空,第二次请求打印 测试信息 重启服务后正常。这是为啥。。。是gc问题吗
是的,Swoole 服务器之所以性能比 PHP-FPM 好,就是它会持久化实例,PHP-FPM 每次请求都会重新初始化所有对象实例,而 Swoole 服务器启动后,对同一个 Worker 进程来说,对象实例只初始化一次,下次请求还会复用上次的实例,除非你手动回收或注销,否则它会一直存在,后面会单独介绍 Swoole 底层运行原理会讲到这个
你好,学院君,冒昧的问一下,如果有大量的这种异步大的操作,swoole的task是怎么处理的,还是说对于这种情况来需要设置足够的task进程?但是设置很多线程,这种又消耗很多内存,困惑中,谢谢
还可以用协程啊 协程是轻量级的用户级线程 上万个都没啥问题
另外 TaskWorker 进程处理异步任务 需要设置工作进程数的 可以根据任务量和耗时设置合理的数量 不要超过 CPU 内核数 肯定不是无限开进程的 否则 Swoole 的设计就有问题了 操作系统支持的进程和线程都是有上限的 可以关注下 C10K 问题
我也碰到这个问题,暂没解决
PHP Swoole 扩展安装了吗 可以通过
php -m
命令查看下swoole已经安装,而且本例中的直接命令行运行php async_task_server.php 和 php tcp_client.php可以跑通,但laravel(v5.8)启动后报错,感觉是
app('swoole')
这个函数报错,swoole对像未绑定