通过 Process 模块在 PHP 中实现多进程(一):简单的多进程 TCP 服务器实现


Process vs PCNTL

我们知道,通过 PHP 自带的扩展 PCNTL 可以实现基于 Unix 的多进程管理(不能用于 Web 服务器环境),比如 Laravel 自带的队列系列解决方案 Horizon 就会用到这个扩展,但是 PCNTL 有很多坑,比如:

  • 没有提供进程间通信的功能;
  • 不支持重定向标准输入和输出;
  • 只提供了 fork 这样原始的接口,容易导致使用错误。

因此,从 1.7.2 版本开始,Swoole 内置了一个进程管理模块 Process 来替代 PHP 自带的 PCNTL 扩展,和 PCNTL 一样,Process 模块也只能在命令行使用,但提供了比 PCNTL 扩展更强大的功能,更容易上手的 API,使 PHP 在多进程编程方面更加轻松。Process 模块主要提供了以下特性:

  • 支持基于 Unix Socket 和 sysvmsg 消息队列的进程间通信,只需调用 write/read 或者 push/pop 即可;
  • 支持重定向标准输入和输出,在子进程内 echo 不会打印到屏幕,而是写入管道,读键盘输入时可以重定向为管道读取数据;
  • 配合 Event 模块,创建的 PHP 子进程支持异步事件驱动模式;
  • 提供了 exec 接口,创建的进程可以执行其他程序,与原 PHP 父进程之间可以方便的通信。

下面我们就来看看如果通过 Swoole 的 Process 模块在 PHP 中实现多进程管理,同时演示上上述特性的实现。

多进程 TCP 服务器实现

我们以一个模拟 TCP 服务器为例,演示基于 Swoole 的多进程服务器实现,在这段代码中,主进程启动后,会启动多个子进程(具体支持的子进程数通过参数设置),处理客户端连接和请求消息,子进程退出后,主进程会重新创建新的子进程,如果主进程退出,则子进程在处理完当前请求后,也会退出。

对应的 TCP 服务器实现代码如下:

<?php
namespace Swoole;

class TcpServer
{
    // 系统支持的最大子进程数
    const MAX_PROCESS = 3;
    // 子进程pid数组
    private $pids = [];
    // 网络套接字
    private $socket;
    // 主进程 ID
    private $mpid;

    /**
     * 服务器主进程业务逻辑
     */
    public function run()
    {
        // 主进程
        $process = new Process(function () {
            // 获取当前进程 ID 作为主进程 ID
            $this->mpid = posix_getpid();
            echo time() . " Master process, pid {$this->mpid}\n";

            // 创建 TCP 服务器并获取套接字
            $this->socket = stream_socket_server("tcp://127.0.0.1:9503", $errno, $errstr);
            if (!$this->socket) {
                exit("Server start error: $errstr --- $errno");
            }

            // 启动子进程处理请求
            for ($i = 0; $i < self::MAX_PROCESS; $i++) {
                $this->startWorkerProcess();
            }

            echo "Waiting client start...\n";

            // 主进程等待子进程退出,必须是死循环
            while (1) {
                foreach ($this->pids as $k => $pid) {
                    if ($pid) {
                        // 回收结束运行的子进程,以避免僵尸进程出现
                        $ret = Process::wait(false);
                        if ($ret) {
                            echo time() . " Worker process $pid exit, will start new... \n";
                            // 子进程退出后重新启动一个新的子进程
                            $this->startWorkerProcess();
                            unset($this->pids[$k]);
                        }
                    }
                }
                sleep(1); //让出 1s 时间给CPU
            }
        }, false, false); //不启用管道通信
        // 让当前进程变成一个守护进程
        Process::daemon();
        // 执行 fork 系统调用,启动进程
        // 注意:start 之后的变量子进程里面是获取不到的
        $process->start();
    }

    /**
     * 创建worker子进程,接收客户端连接并处理
     */
    private function startWorkerProcess()
    {
        // 子进程
        $process = new Process(function (Process $worker) {
            // 子进程业务逻辑
            $this->acceptClient($worker);
        }, false, false);
        // 启动子进程并获取子进程 ID
        $pid = $process->start();
        $this->pids[] = $pid;
    }

    /**
     * 等待客户端连接并处理
     * @param Process $worker
     */
    private function acceptClient(&$worker)
    {
        //子进程一直等待客户端连接,不能退出
        while (1) {
            // 从主进程创建的网络套接字上获取连接
            $conn = stream_socket_accept($this->socket, -1);
            // 如果定义了连接建立回调函数,则在连接上执行该回调
            if ($this->onConnect) {
                call_user_func($this->onConnect, $conn);
            }

            // 开始循环读取客户端请求消息
            $recv = ''; // 实际收到的消息
            $buffer = ''; // 缓冲消息
            while (1) {
                // 检查主进程是否正常,不正常则退出子进程
                $this->checkMpid($worker);
                // 读取客户端请求消息
                $buffer = fread($conn, 20);

                // 没有收到正常消息
                if ($buffer === false || $buffer === '') {
                    // 如果服务器设置了连接关闭回调函数,则在当前连接上执行该回调
                    if ($this->onClose) {
                        call_user_func($this->onClose, $conn);
                    }
                    // 结束读取消息,退出当前循环,等待下一个客户端连接
                    break;
                }

                // 消息结束符的位置
                $pos = strpos($buffer, "\n");
                if ($pos === false) {  // 没有读取完,继续读取
                    $recv .= $buffer;
                } else {  // 读取完毕,开始处理请求消息
                    // 处理收到的消息
                    $recv .= trim(substr($buffer, 0, $pos + 1));

                    // 如果服务器定义了消息处理回调函数,则在当前连接上将消息传入回调函数并执行该回调
                    if ($this->onMessage) {
                        call_user_func($this->onMessage, $conn, $recv);
                    }

                    // 如果接收到 quit 消息,表示关闭此连接,等待下一个客户端连接
                    if ($recv == "quit") {
                        echo "Client close connection\n";
                        fclose($conn);
                        break;
                    }

                    $recv = ''; // 清空消息,准备下一次接收
                }
            }
        }
    }

    /**
     * 如果主进程已退出,则子进程也退出,避免孤儿进程出现
     * @param Process $worker
     */
    public function checkMpid(&$worker)
    {
        // 检测主进程是否存在,如果不存在,则退出子进程
        if (!Process::kill($this->mpid, 0)) {
            $worker->exit();
            // 这句提示,实际是看不到的,需要写到日志中
            echo "Master process exited, I [{$worker['pid']}] also quit\n";
        }
    }
}

$server = new TcpServer();

// 定义连接建立回调函数
$server->onConnect = function ($conn) {
    echo "onConnect -- accepted " . stream_socket_get_name($conn, true) . "\n";
};

// 定义收到消息回调函数
$server->onMessage = function ($conn, $msg) {
    echo "onMessage --" . $msg . "\n";
    fwrite($conn, "received " . $msg . "\n");
};

// 定义连接关闭回调函数
$server->onClose = function ($conn) {
    echo "onClose --" . stream_socket_get_name($conn, true) . "\n";
};

// 启动服务器主进程
$server->run();

我给这段代码加了详尽的注释,如果你还有不清楚的地方,可以查看 Swoole 官方文档了解 Process 每个方法的使用明细。

显然这个引入了多进程支持的 TCP 服务器比我们之前 Swoole 入门示例中编写的 TCP 服务器功能要强大,同时可以支持三个客户端连接,并且会对工作进程进行一些简单的维护。

下面我们启动这个 TCP 服务器(假设上述代码保存到了 process/tcp_server.php 文件中),由于我们设置主进程以后台守护进程方式运行,所以启动成功之后就会将控制权交回给操作系统,我们可以通过 ps -ef | grep php 命令查看 TCP 服务器的运行情况:

Swoole 多进程 TCP 服务器

可以看到主进程 ID 是 56929,以及其它三个通过主进程 fork 出来的子进程(父进程 ID 是 56929),表明服务器启动成功。

然后我们以 Swoole 入门示例中创建的 TCP 客户端为例演示测试客户端与上述多进程 TCP 服务器的连接,新开一个命令行窗口启动 TCP 客户端,可以看到会输出服务器返回的数据:

Swoole TCP 客户端输出

在启动 TCP 服务器的窗口也会看到如下日志输出:

Swoole TCP 服务器输出

好了,这就是 Swoole 多进程模块 Process 的一个简单应用,你还可以将其用于其他需要用到多进程的地方以提高代码的并发处理能力。

注:以上 TCP 服务器示例代码整理自 PHP多进程系列笔记(五) 这篇教程。


Vote Vote Cancel Collect Collect Cancel

<< 上一篇: 基于 Swoole 在 Laravel 中实现异步事件监听及处理

>> 下一篇: 基于 Process 模块在 PHP 中实现多进程(二):进程间通信