基于 Swoole 实现协程篇(二):通过协程实现并发编程


Swoole 内置了丰富的协程组件供开发者直接调用以便快速实现异步非阻塞的并发编程,省去了开发者自己实现相应底层代码的麻烦:

在协程 Server 中使用对应的协程版 Client 来实现全异步 Server,同时 Swoole 提供了协程工具集:Swoole\Coroutine,提供了获取当前协程ID、反射调用等能力。

通过 setDefer 机制实现并发编程

我们以 Redis 和 MySQL 客户端请求为例,使用上述 Swoole\Coroutine\RedisSwoole\Coroutine\MySQL 组件,可以实现异步 Redis 和 MySQL 客户端:

<?php

$server = new \Swoole\Http\Server('127.0.0.1', 9588);

$server->on('Request', function ($request, $response) {

    var_dump(time());

    $mysql = new Swoole\Coroutine\MySQL();
    $mysql->connect([
        'host' => '127.0.0.1',
        'user' => 'root',
        'password' => 'root',
        'database' => 'laravel58',
    ]);
    $mysql->setDefer();
    $mysql->query('select sleep(3)');

    var_dump(time());

    $redis1 = new Swoole\Coroutine\Redis();
    $redis1->connect('127.0.0.1', 6379);
    $redis1->setDefer();
    $redis1->set('hello', 'world');

    var_dump(time());

    $redis2 = new Swoole\Coroutine\Redis();
    $redis2->connect('127.0.0.1', 6379);
    $redis2->setDefer();
    $redis2->get('hello');

    $result1 = $mysql->recv();
    $result2 = $redis2->recv();

    var_dump($result1, $result2, time());

    $response->end('Request Finish: ' . time());
});

$server->start();

由于 Swoole 会在 TCP Server 和 HTTP Server 回调函数中会自动开启协程,所以不需要显式通过 go 关键字启动协程,然后我们可以在回调函数中使用 MySQL 和 Redis 客户端协程组件发起请求。

要理解上述代码的运行原理,需要先了解协程的 setDefer 机制,绝大部分协程组件都支持 setDefer,该机制可以将请求响应式的接口拆分为两个步骤:先发送数据, 再并发收取响应结果。

由于大多数情况下,「建立连接和发送数据的耗时」相较于「等待响应的耗时」来说可以忽略不计, 所以可以简单理解为 defer 模式下, 多个客户端的请求响应是并发的(实际上只有接收响应是并发的,建立连接和发送请求是串行的)。

以上述代码为例,设置 setDefer(true) 后,通过 Redis 或 MySQL 客户端发起请求,将不再等待服务器返回结果,而是在发送请求之后,立即返回 true。在此之后可以继续发起其他 Redis、MySQL 请求,最后再使用 recv() 方法接收响应内容。

我们将上述代码保存到 coroutine/http.php,然后在终端启动这个 HTTP 服务端:

php coroutine/http.php

接下来,在 Postman 中对服务端发起请求,会在等待几秒后看到返回的响应内容:

Postman+Swoole+HTTP Server

此时,可以在终端看到服务端打印的内容:

Swoole 基于 defer 实现并发编程

前三个时间分别是 mysqlredis1redis2 三个客户端发起请求的时间,可以看到,尽管 mysql 中会休眠 3 秒,但是通过 defer 机制实现了三个请求的并发执行。

通过子协程+通道实现并发编程

除了 setDefer 机制外,Swoole 还支持通过子协程+通道实现并发编程,下面我们通过子协程+通道的方式来改写上面的代码实现:

<?php

$server = new \Swoole\Http\Server('127.0.0.1', 9588);

$server->on('Request', function ($request, $response) {

    $channel = new \Swoole\Coroutine\Channel(3);

    go(function () use ($channel) {
        var_dump(time());

        $mysql = new Swoole\Coroutine\MySQL();
        $mysql->connect([
            'host' => '127.0.0.1',
            'user' => 'root',
            'password' => 'root',
            'database' => 'laravel58',
        ]);
        $result = $mysql->query('select sleep(3)');

        $channel->push($result);
    });

    go(function () use ($channel) {
        var_dump(time());

        $redis1 = new Swoole\Coroutine\Redis();
        $redis1->connect('127.0.0.1', 6379);
        $result = $redis1->set('hello', 'world');

        $channel->push($result);
    });

    go(function () use ($channel) {
        var_dump(time());

        $redis2 = new Swoole\Coroutine\Redis();
        $redis2->connect('127.0.0.1', 6379);

        $result = $redis2->get('hello');
        $channel->push($result);
    });

    $results = [];
    for ($i = 0; $i < 3; $i++) {
        $results[] = $channel->pop();
    }

    $response->end(json_encode([
        'data' => $results,
        'time' => time()
    ]));
});

$server->start();

我们将 MySQL 和 Redis 客户端连接请求调用改写为通过三个子协程实现,同时去掉 setDefer 设置,因为这三个子协程已经是并发调用了,此外,由于三个子协程之间数据是相互隔离的,所以我们通过 Swoole\Coroutine\Channel (即通道)实现协程之间的数据共享和通信,初始化其缓冲空间为 3,然后通过 use 方式将其引入到子协程中,把响应结果通过 push 方法放到 Channel 里面,接下来在服务端 onRequest 回调函数末尾通过一个循环将 Channel 中的数据通过 pop 方法依次取出来放到数组 $results 中,最后通过 $response->end() 方法将结果以 JSON 格式返回给客户端。

我们将上述代码保存到 coroutine/http2.php,然后在终端通过如下命令启动这个新的 HTTP 服务端:

php coroutine/http2.php

还是在 Postman 中请求这个服务端,将响应格式调整为 JSON,会看到结果如下:

Postman+Swoole

由于 MySQL 请求执行耗时最长,所以位置最靠后。在启动服务器的终端,可以看到打印出的三个客户端请求时间,完全一致,说明它们是并发执行的:

 Swoole 基于子协程+通道实现并发编程

显然,通过子协程 + 通道还可以很方便的实现 Redis、MySQL 连接池,相信看完这个示例,你应该可以很快领会这个连接池怎么实现。


Vote Vote Cancel Collect Collect Cancel

<< 上一篇: 基于 Swoole 实现协程篇(一):基本概念和底层原理

>> 下一篇: 基于 Swoole 实现协程篇(三):在 Laravel 框架中使用协程