内部进程间通讯

    目录

    为了方便进程内部通讯,进行数据交换。imi v1.2.0 版本新增了内部进程间通讯封装。

    我们只需要使用 Server::sendMessage() 发送,使用监听事件接收就行了。

    支持: Swoole、Workerman

    Swoole

    Worker 进程

    使用 Swoole 提供的 sendMessage()onPipeMessage 事件 实现。

    onPipeMessage 事件中,收到指定结构的数据,就会触发相应事件。

    类名:Imi\Swoole\Server\Server

    用户进程

    imi 实现的 Unix Socket 双向通信,Swoole 自带的 pipe 无法实现双向通信。

    向进程发送消息:

    use Imi\Swoole\Process\ProcessManager;
    $process = ProcessManager::getProcessWithManager('XXXProcess');
    
    // 返回 bool 类型
    $process->sendUnixSocketMessage('动作名');
    $process->sendUnixSocketMessage('动作名', 123); // 第二个参数可以带任意类型的变量,如果是对象必须可被序列化

    进程监听消息:

    监听事件:IMI.PROCESS.PIPE_MESSAGE

    事件参数类:\Imi\Swoole\Process\Event\Param\PipeMessageEventParam

    <?php
    use Imi\Bean\Annotation\Listener;
    use Imi\Event\EventParam;
    use Imi\Event\IEventListener;
    use Imi\Server\ServerManager;
    use Imi\Swoole\Process\Event\Param\PipeMessageEventParam;
    
    /**
     * @Listener(eventName="IMI.PROCESS.PIPE_MESSAGE")
     */
    class MyListener implements IEventListener
    {
        /**
         * @param PipeMessageEventParam $e
         */
        public function handle(EventParam $e): void
        {
            var_dump($e->action); // 获取动作名
            var_dump($e->data); // 获取数据
    
            // 返回发送方数据(非必须)
            $e->process->sendUnixSocketMessageByConnection($e->connection, '动作名');
            $e->process->sendUnixSocketMessageByConnection($e->connection, '动作名', 123); // 第二个参数可以带任意类型的变量,如果是对象必须可被序列化
        }
    }

    Workerman

    使用 Channel分布式通讯组件 实现。

    类名:Imi\Workerman\Server\Server

    介绍

    数据结构

    [
        'action'    =>  '动作名', // 此字段固定
        // 其它参数任意增加即可
    ]

    事件名称

    IMI.PIPE_MESSAGE.动作名

    代码示例

    发送

    use Imi\Swoole\Server\Server;
    
    // 发送给所有 Worker 进程
    Server::sendMessage('test', [
        'time'  =>  time(),
    ]);
    
    // 发送给 WorkerId 为 1 的进程
    Server::sendMessage('test', [
        'time'  =>  time(),
    ], 1);

    监听

    事件名称为:IMI.PIPE_MESSAGE.test

    <?php
    namespace App\Listener;
    
    use Imi\Event\EventParam;
    use Imi\Event\IEventListener;
    use Imi\Bean\Annotation\Listener;
    
    /**
     * @Listener("IMI.PIPE_MESSAGE.test")
     */
    class TestMessage implements IEventListener
    {
        /**
         * 事件处理方法
         * @param EventParam $e
         * @return void
         */
        public function handle(EventParam $e): void
        {
            $data = $e->getData()['data'];
            var_dump($data['time']); // 接收到了上面发送来的 time
        }
    
    }

    发送并获取返回数据

    思路:

    一般来讲,发送消息不像 http 请求,一定会有响应结果。

    但有时候,我们需要获取返回数据。

    办法很简单,比如:发送数据动作名为 testRequest,再定义一个 testResponse 动作监听用于接收数据即可。

    再使用 Channel 挂起协程等待响应结果,完美!

    暂时只有 Swoole 支持

    发送请求并等待响应:

    use Imi\Swoole\Server\Server;
    use Imi\Swoole\Util\Co\ChannelContainer;
    
    // 生成一个随机ID
    $id = uniqid('', true);
    
    try {
        $channel = ChannelContainer::getChannel($id);
        // 发送给 WorkerId 为 1 的进程
        Server::sendMessage('testRequest', [
            'time'  =>  time(),
        ], 1);
        // 通过 Channel 获取结果,超时时间可以自行设置,这里是 30 秒
        $result = $channel->pop(30);
        if(false === $result)
        {
            throw new \RuntimeException('Receive error');
        }
        var_dump($result['datetime']); // 返回结果
    } finally {
        ChannelContainer::removeChannel($id);
    }

    监听请求:

    事件名称为:IMI.PIPE_MESSAGE.testRequest

    <?php
    namespace App\Listener;
    
    use Imi\Event\EventParam;
    use Imi\Event\IEventListener;
    use Imi\Bean\Annotation\Listener;
    use Imi\Swoole\Server\Server;
    
    /**
     * @Listener("IMI.PIPE_MESSAGE.testRequest")
     */
    class TestRequestMessage implements IEventListener
    {
        /**
         * 事件处理方法
         * @param EventParam $e
         * @return void
         */
        public function handle(EventParam $e): void
        {
            $data = $e->getData()['data'];
            $datetime = date('Y-m-d H:i:s', $data['time']);
    
            // 发送响应结果
            Server::sendMessage('testResponse', [
                'messageId' =>  $data['messageId'];
                'datetime'  =>  $datetime,
            ], $e->getData()['workerId']);
        }
    
    }

    监听响应:

    暂时只有 Swoole 支持

    事件名称为:IMI.PIPE_MESSAGE.testResponse

    <?php
    namespace App\Listener;
    
    use Imi\Event\EventParam;
    use Imi\Event\IEventListener;
    use Imi\Bean\Annotation\Listener;
    use Imi\Swoole\Server\Server;
    use Imi\Swoole\Util\Co\ChannelContainer;
    
    /**
     * @Listener("IMI.PIPE_MESSAGE.testResponse")
     */
    class TestResponseMessage implements IEventListener
    {
        /**
         * 事件处理方法
         * @param EventParam $e
         * @return void
         */
        public function handle(EventParam $e): void
        {
            $data = $e->getData()['data'];
            if(ChannelContainer::hasChannel($data['messageId']))
            {
                // 推结果进 Channel
                ChannelContainer::push($data['messageId'], $data);
            }
        }
    
    }