基于 Swoole 在 Laravel 中实现异步任务队列


Swoole 异步任务

实现原理

我们知道,PHP 本身的设计是同步阻塞的,不支持多线程和异步 IO,所以当我们执行一些耗时的操作,比如发送广播,或者邮件,如果直接在当前进程中操作,会导致服务器响应变慢,因此要借助一些第三方服务来处理以实现异步功能,比如队列,而 Swoole 作为 PHP 异步网络通信引擎,自然也对异步任务处理提供了支持,其底层的实现原理和常见的异步队列类似:将耗时任务投递到 TaskWorker 进程池后返回(相应任务会通过 TaskWorker 异步执行,执行成功后可以调用事先注册的回调函数进行后续处理),继续后续业务逻辑的执行,而不影响当前请求的处理速度。关于 TaskWorker 后面我们介绍 Swoole 底层原理的时候还会详细介绍。

示例代码

我们可以基于 Swoole 入门教程中编写的 TCP 服务器为基础来实现一个异步任务服务器用来处理异步任务,只需添加一个任务处理和一个任务完成回调即可,此外,还需要配置 TaskWorker 进程数以保证任务处理的速度,可以根据任务的耗时和任务量配置其值,通常我们可以将其配置为 CPU 数量的两倍:

<?php

$server = new \Swoole\Server("127.0.0.1", 9503);

// 设置异步任务的工作进程数量
$server->set(array('task_worker_num' => 4));

//收到请求时触发
$server->on('receive', function(\Swoole\Server $server, $fd, $from_id, $data) {
    //投递异步任务
    $task_id = $server->task($data);
    echo "异步任务投递成功: id=$task_id\n";
     $server->send($fd, "数据已接收,处理中...");
});

// 处理异步任务
$server->on('task', function (\Swoole\Server $server, $task_id, $from_id, $data) {
    echo "新的待处理异步任务[id=$task_id]".PHP_EOL;
    // todo 处理异步任务
    // 返回任务执行的结果
    $server->finish("$data -> OK");
});

// 处理异步任务的结果
$server->on('finish', function (\Swoole\Server $server, $task_id, $data) {
    echo "异步任务[$task_id] 处理完成: $data".PHP_EOL;
});

$server->start();

我们将该文件保存为 async_task_server.php,然后在命令行启动这个服务器:

php async_task_server.php

接下来,启动在 Swoole 入门教程中编写的 TCP 客户端

php tcp_client.php

服务器投递任务到 TaskWorker 后会立即将处理信息发送给客户端:

然后服务器会异步处理任务:

在 Laravel 中基于 Swoole 实现异步任务队列

还是以 LaravelS 扩展包为基础,我们在 Laravel 项目中实现基于 Swoole 实现异步任务队列功能,其原理就是上面介绍的 Swoole 异步任务,只不过该扩展包对其做了封装而已。

编写任务类

首先我们创建一个继承自 Hhxsv5\LaravelS\Swoole\Task\Task 基类的待处理任务类 TestTask

<?php
namespace App\Jobs;

use Hhxsv5\LaravelS\Swoole\Task\Task;
use Illuminate\Support\Facades\Log;

class TestTask extends Task
{
    // 待处理任务数据
    private $data;

    // 任务处理结果
    private $result;

    public function __construct($data)
    {
        $this->data = $data;
    }

    // 任务投递调用 task 回调时触发,等同于 Swoole 中的 onTask 逻辑
    public function handle()
    {
        Log::info(__CLASS__ . ': 开始处理任务', [$this->data]);
        //  todo 耗时任务具体处理逻辑在这里编写
        sleep(3); // 模拟任务需要3秒才能执行完毕
        $this->result = 'The result of ' . $this->data . ' is balabalabala';
    }

    // 任务完成调用 finish 回调时触发,等同于 Swoole 中的 onFinish 逻辑
    public function finish()
    {
        Log::info(__CLASS__ . ': 任务处理完成', [$this->result]);
        // 可以在这里触发后续要执行的任务,或者执行其他善后逻辑
    }
}

编写测试代码

然后在 routes/web.php 编写投递异步任务的测试代码如下:

Route::get('/task/test', function () {
    $task = new \App\Jobs\TestTask('测试异步任务');
    $success = \Hhxsv5\LaravelS\Swoole\Task\Task::deliver($task);  // 异步投递任务,触发调用任务类的 handle 方法
    var_dump($success);
});

修改配置文件

此外还要在配置文件 config/laravels.php 中取消 task_worker_num 配置项前面的注释:

'swoole' => [
    ...
    'task_worker_num'    => function_exists('swoole_cpu_num') ? swoole_cpu_num() * 2 : 8,
    ...
]

测试异步任务执行

接下来,我们重启启动 Swoole 服务器(基于 Swoole HTTP 服务器访问路由才能成功投递异步任务):

php bin/laravels restart

然后在浏览器中通过 http://todo-s.test/task/test 访问测试路由,页面立即显示投递成功:

bool(true)

然后我们去 storage/logs 目录下查看最新的日志信息,可以看到任务执行其实耗费了 3 秒:

[2019-05-30 16:31:32] local.INFO: App\Jobs\TestTask: 开始处理任务 ["测试异步任务"] 
[2019-05-30 16:31:35] local.INFO: App\Jobs\TestTask: 任务处理完成 ["The result of 测试异步任务 is balabalabala"]

这样,我们就成功在 Laravel 项目中基于 Swoole 实现了异步任务消费队列,很简单吧。


Vote Vote Cancel Collect Collect Cancel

<< 上一篇: 基于 Swoole 定时器实现毫秒级任务调度

>> 下一篇: 基于 Swoole 在 Laravel 中实现异步事件监听及处理