基于 Swoole 实现协程篇(二):通过协程实现并发编程
Swoole 内置了丰富的协程组件供开发者直接调用以便快速实现异步非阻塞的并发编程,省去了开发者自己实现相应底层代码的麻烦:
- TCP/UDP Client:Swoole\Coroutine\Client
- TCP/UDP Server:Swoole\Coroutine\Server
- HTTP/WebSocket Client:Swoole\Coroutine\HTTP\Client
- HTTP/WebSocket Server:Swoole\Coroutine\HTTP\Server
- HTTP2 Client:Swoole\Coroutine\HTTP2\Client
- Redis Client:Swoole\Coroutine\Redis
- Mysql Client:Swoole\Coroutine\MySQL
- PostgreSQL Client:Swoole\Coroutine\PostgreSQL
在协程 Server 中使用对应的协程版 Client 来实现全异步 Server,同时 Swoole 提供了协程工具集:Swoole\Coroutine,提供了获取当前协程ID、反射调用等能力。
通过 setDefer 机制实现并发编程
我们以 Redis 和 MySQL 客户端请求为例,使用上述 Swoole\Coroutine\Redis
和 Swoole\Coroutine\MySQL
组件,可以实现异步 Redis 和 MySQL 客户端:
<?php
$server = new \Swoole\Http\Server('127.0.0.1', 9588);
$server->on('Request', function ($request, $response) {
var_dump(time());
$mysql = new Swoole\Coroutine\MySQL();
$mysql->connect([
'host' => '127.0.0.1',
'user' => 'root',
'password' => 'root',
'database' => 'laravel58',
]);
$mysql->setDefer();
$mysql->query('select sleep(3)');
var_dump(time());
$redis1 = new Swoole\Coroutine\Redis();
$redis1->connect('127.0.0.1', 6379);
$redis1->setDefer();
$redis1->set('hello', 'world');
var_dump(time());
$redis2 = new Swoole\Coroutine\Redis();
$redis2->connect('127.0.0.1', 6379);
$redis2->setDefer();
$redis2->get('hello');
$result1 = $mysql->recv();
$result2 = $redis2->recv();
var_dump($result1, $result2, time());
$response->end('Request Finish: ' . time());
});
$server->start();
由于 Swoole 会在 TCP Server 和 HTTP Server 回调函数中会自动开启协程,所以不需要显式通过 go 关键字启动协程,然后我们可以在回调函数中使用 MySQL 和 Redis 客户端协程组件发起请求。
要理解上述代码的运行原理,需要先了解协程的 setDefer 机制,绝大部分协程组件都支持 setDefer,该机制可以将请求响应式的接口拆分为两个步骤:先发送数据, 再并发收取响应结果。
由于大多数情况下,「建立连接和发送数据的耗时」相较于「等待响应的耗时」来说可以忽略不计, 所以可以简单理解为 defer 模式下, 多个客户端的请求响应是并发的(实际上只有接收响应是并发的,建立连接和发送请求是串行的)。
以上述代码为例,设置 setDefer(true)
后,通过 Redis 或 MySQL 客户端发起请求,将不再等待服务器返回结果,而是在发送请求之后,立即返回 true
。在此之后可以继续发起其他 Redis、MySQL 请求,最后再使用 recv()
方法接收响应内容。
我们将上述代码保存到 coroutine/http.php
,然后在终端启动这个 HTTP 服务端:
php coroutine/http.php
接下来,在 Postman 中对服务端发起请求,会在等待几秒后看到返回的响应内容:
此时,可以在终端看到服务端打印的内容:
前三个时间分别是 mysql
、redis1
、redis2
三个客户端发起请求的时间,可以看到,尽管 mysql
中会休眠 3 秒,但是通过 defer 机制实现了三个请求的并发执行。
通过子协程+通道实现并发编程
除了 setDefer 机制外,Swoole 还支持通过子协程+通道实现并发编程,下面我们通过子协程+通道的方式来改写上面的代码实现:
<?php
$server = new \Swoole\Http\Server('127.0.0.1', 9588);
$server->on('Request', function ($request, $response) {
$channel = new \Swoole\Coroutine\Channel(3);
go(function () use ($channel) {
var_dump(time());
$mysql = new Swoole\Coroutine\MySQL();
$mysql->connect([
'host' => '127.0.0.1',
'user' => 'root',
'password' => 'root',
'database' => 'laravel58',
]);
$result = $mysql->query('select sleep(3)');
$channel->push($result);
});
go(function () use ($channel) {
var_dump(time());
$redis1 = new Swoole\Coroutine\Redis();
$redis1->connect('127.0.0.1', 6379);
$result = $redis1->set('hello', 'world');
$channel->push($result);
});
go(function () use ($channel) {
var_dump(time());
$redis2 = new Swoole\Coroutine\Redis();
$redis2->connect('127.0.0.1', 6379);
$result = $redis2->get('hello');
$channel->push($result);
});
$results = [];
for ($i = 0; $i < 3; $i++) {
$results[] = $channel->pop();
}
$response->end(json_encode([
'data' => $results,
'time' => time()
]));
});
$server->start();
我们将 MySQL 和 Redis 客户端连接请求调用改写为通过三个子协程实现,同时去掉 setDefer 设置,因为这三个子协程已经是并发调用了,此外,由于三个子协程之间数据是相互隔离的,所以我们通过 Swoole\Coroutine\Channel (即通道)实现协程之间的数据共享和通信,初始化其缓冲空间为 3,然后通过 use 方式将其引入到子协程中,把响应结果通过 push
方法放到 Channel 里面,接下来在服务端 onRequest 回调函数末尾通过一个循环将 Channel 中的数据通过 pop
方法依次取出来放到数组 $results
中,最后通过 $response->end()
方法将结果以 JSON 格式返回给客户端。
我们将上述代码保存到 coroutine/http2.php
,然后在终端通过如下命令启动这个新的 HTTP 服务端:
php coroutine/http2.php
还是在 Postman 中请求这个服务端,将响应格式调整为 JSON,会看到结果如下:
由于 MySQL 请求执行耗时最长,所以位置最靠后。在启动服务器的终端,可以看到打印出的三个客户端请求时间,完全一致,说明它们是并发执行的:
显然,通过子协程 + 通道还可以很方便的实现 Redis、MySQL 连接池,相信看完这个示例,你应该可以很快领会这个连接池怎么实现。
10 条评论
通过setDefer实现并发编程的例子result2返回值为null是什么情况呢,我看redis1 set返回为true, redis2 get是返回true的 但是redis2->recv()返回为null了
result2 应该返回的是 hello 键对应的值 你可以 sleep 1 秒再看看
一样 但是用子协程+通道又没有问题
你的 PHP 和 Swoole 版本是多少
php:7.3.3 swoole:4.4.0
你看下配置的swoole和redis扩展是不是只加到了 cli/php.ini,没有加fpm/php.ini。
\Swoole\Coroutine\Redis::setDefer()方法需要传参
“所以我们通过 Swoole\Coroutine\Channel (即通道)实现协程之间的数据共享和通信” -- 我个人理解是onRequest 这个函数也是处在一个协程,所以他遍历其他协程的时候需要channel这个通道去获取其他三个子协程的数据。不然我不理解这个channel在这里有什么要共享的。请指点
onRequest 里面三个通过 go 函数启用的子协程之间共享,作为不同协程之间通信的载体,你看里面都用 $channel->push 推送数据到 channel 了,你也可以在协程里面读取 channel 上的数据
使用setdefer机制时,swoole/ide-helper提示 redis的setdefer()需要传参,这个是必要的么? redis的set()方法返回true,result2 返回结果是null,没有提示,对用于写入数据的redis客户端执行recv()方法后,返回结果变成了world,请问这是什么原因