AMQP

    目录

    介绍

    支持在 imi 框架中使用 支持 AMQP 协议的消息队列,如:RabbitMQ

    支持消息发布和消费

    Github: https://github.com/imiphp/imi-amqp

    Composer

    本项目可以使用composer安装,遵循psr-4自动加载规则,在你的 composer.json 中加入下面的内容:

    {
        "require": {
            "imiphp/imi-amqp": "~2.1.0"
        }
    }

    然后执行 composer update 安装。

    使用说明

    可以参考 example 目录示例,包括完整的消息发布和消费功能。

    在项目 config/config.php 中配置:

    [
        'components'    =>  [
            // 引入组件
            'AMQP'   =>  'Imi\AMQP',
        ],
    ]

    连接池配置:(Swoole)

    [
        'pools'    =>    [
            'rabbit'    =>  [
                'pool'    =>    [
                    'class'        =>    \Imi\AMQP\Pool\AMQPCoroutinePool::class,
                    'config'    =>    [
                        'maxResources'      => 10,
                        'minResources'      => 1,
                        'heartbeatInterval' => 30, // 连接池心跳时间,推荐设置
                    ],
                ],
                'resource'    =>    [
                    'host'            => '127.0.0.1',
                    'port'            => 5672,
                    'user'            => 'guest',
                    'password'        => 'guest',
                    'keepalive'       => false, // 截止 Swoole 4.8 还有兼容问题,所以必须设为 false,不影响使用
                    'connectionClass' => \PhpAmqpLib\Connection\AMQPStreamConnection::class,
                ]
            ],
        ]
    ]

    连接配置:(Workerman)

    'amqp' => [
        'connections' => [
            'rabbit'    => [
                'host'      => '127.0.0.1',
                'port'      => 5672,
                'user'      => 'guest',
                'password'  => 'guest',
            ],
        ],
    ],

    默认连接池:

    [
        'beans' =>  [
            'AMQP'  =>  [
                'defaultPoolName'   =>  'rabbit',
            ],
        ],
    ]

    队列组件支持

    本组件额外实现了 imiphp/imi-queue 的接口,可以用 Queue 组件的 API 进行调用。

    只需要将队列驱动配置为:KafkaQueueDriver

    配置示例:

    [
        'beans' =>  [
            'AutoRunProcessManager' =>  [
                'processes' =>  [
                    // 加入队列消费进程,非必须,你也可以自己写进程消费
                    'QueueConsumer',
                ],
            ],
            'imiQueue'  => [
                // 默认队列
                'default'   => 'QueueTest1',
                // 队列列表
                'list'  => [
                    // 队列名称
                    'QueueTest1' => [
                        // 使用的队列驱动
                        'driver'        => 'AMQPQueueDriver',
                        // 消费协程数量
                        'co'            => 1,
                        // 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
                        'process'       => 1,
                        // 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
                        'timespan'      => 0.1,
                        // 进程分组名称
                        'processGroup'  => 'a',
                        // 自动消费
                        'autoConsumer'  => true,
                        // 消费者类
                        'consumer'      => 'TestConsumer',
                        // 驱动类所需要的参数数组
                        'config'        => [
                            // AMQP 连接池名称
                            'poolName'      => 'rabbit',
                            // Redis 连接池名称
                            'redisPoolName' => 'redis',
                            // Redis 键名前缀
                            'redisPrefix'   => 'QueueTest1:',
                            // 可选配置:
                            // 支持消息删除功能,依赖 Redis
                            'supportDelete' => true,
                            // 支持消费超时队列功能,依赖 Redis,并且自动增加一个队列
                            'supportTimeout' => true,
                            // 支持消费失败队列功能,自动增加一个队列
                            'supportFail' => true,
                            // 循环尝试 pop 的时间间隔,单位:秒
                            'timespan'  => 0.03,
                            // 本地缓存的队列长度。由于 AMQP 不支持主动pop,而是主动推送,所以本地会有缓存队列,这个队列不宜过大。
                            'queueLength'   => 16,
                            // 消息类名
                            'message'   => \Imi\AMQP\Queue\JsonAMQPMessage::class,
                        ],
                    ],
                ],
            ],
        ]
    ]

    消费者类写法,与imi-queue组件用法一致。

    AMQP 消费发布

    这个写法仅 AMQP 有效,其它消息队列不能这么写。

    优点是可以完美利用 AMQP 特性,适合需要个性化定制的用户。

    连接配置项

    属性名称说明
    host主机
    port端口
    user用户名
    vhostvhost,默认/
    insistinsist
    loginMethod默认AMQPLAIN
    loginResponseloginResponse
    locale默认en_US
    connectionTimeout连接超时
    readWriteTimeout读写超时
    keepalivekeepalive,默认false
    heartbeat心跳时间。如果不设置的情况,设置了连接池的心跳,就会设置为该值的 2 倍,否则设为0
    channelRpcTimeout频道 RPC 超时时间,默认0.0
    sslProtocolssl 协议,默认null

    消息定义

    继承 Imi\AMQP\Message 类,可在构造方法中对属性修改。

    根据需要可以覆盖实现setBodyDatagetBodyData方法,实现自定义的消息结构。

    <?php
    namespace ImiApp\AMQP\Test2;
    
    use Imi\AMQP\Message;
    
    class TestMessage2 extends Message
    {
        /**
         * 用户ID
         *
         * @var int
         */
        private $memberId;
    
        /**
         * 内容
         *
         * @var string
         */
        private $content;
    
        public function __construct()
        {
            parent::__construct();
            $this->routingKey = 'imi-2';
            $this->format = \Imi\Util\Format\Json::class;
        }
    
        /**
         * 设置主体数据
         *
         * @param mixed $data
         * @return self
         */
        public function setBodyData($data)
        {
            foreach($data as $k => $v)
            {
                $this->$k = $v;
            }
        }
    
        /**
         * 获取主体数据
         *
         * @return mixed
         */
        public function getBodyData()
        {
            return [
                'memberId'  =>  $this->memberId,
                'content'   =>  $this->content,
            ];
        }
    
        /**
         * Get 用户ID
         *
         * @return int
         */ 
        public function getMemberId()
        {
            return $this->memberId;
        }
    
        /**
         * Set 用户ID
         *
         * @param int $memberId  用户ID
         *
         * @return self
         */ 
        public function setMemberId(int $memberId)
        {
            $this->memberId = $memberId;
    
            return $this;
        }
    
        /**
         * Get 内容
         *
         * @return string
         */ 
        public function getContent()
        {
            return $this->content;
        }
    
        /**
         * Set 内容
         *
         * @param string $content  内容
         *
         * @return self
         */ 
        public function setContent(string $content)
        {
            $this->content = $content;
    
            return $this;
        }
    
    }

    属性列表:

    名称说明默认值
    bodyData消息主体内容,非字符串null
    properties属性['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,]
    routingKey路由键空字符串
    format如果设置了,发布的消息是编码后的bodyData,同理读取时也会解码。实现了Imi\Util\Format\IFormat的格式化类。支持JsonPhpSerializenull
    mandatorymandatory标志位false
    immediateimmediate标志位false
    ticketticketnull

    发布者定义

    必选注解:@Publisher

    可选注解:@Queue@Exchange@Connection

    不配置 @Connection 注解,可以从连接池中获取连接

    <?php
    namespace ImiApp\AMQP\Test;
    
    use Imi\Bean\Annotation\Bean;
    use Imi\AMQP\Annotation\Queue;
    use Imi\AMQP\Base\BasePublisher;
    use Imi\AMQP\Annotation\Consumer;
    use Imi\AMQP\Annotation\Exchange;
    use Imi\AMQP\Annotation\Publisher;
    use Imi\AMQP\Annotation\Connection;
    
    /**
     * @Bean("TestPublisher")
     * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest")
     * @Publisher(tag="tag-imi", queue="queue-imi-1", exchange="exchange-imi", routingKey="imi-1")
     * @Queue(name="queue-imi-1", routingKey="imi-1")
     * @Exchange(name="exchange-imi")
     */
    class TestPublisher extends BasePublisher
    {
    
    }

    发布消息

    // 实例化构建消息
    $message = new \ImiApp\AMQP\Test2\TestMessage2;
    $message->setMemberId(1);
    $message->setContent('imi niubi');
    
    // 发布消息
    /** @var \ImiApp\AMQP\Test\TestPublisher $testPublisher */
    $testPublisher = \Imi\RequestContext::getBean('TestPublisher');
    // 请勿使用 App::getBean()、@Inject 等全局单例注入
    // $testPublisher = App::getBean('TestPublisher');
    $testPublisher->publish($message);

    消费者定义

    必选注解:@Consumer

    可选注解:@Queue@Exchange@Connection

    不配置 @Connection 注解,可以从连接池中获取连接

    <?php
    namespace ImiApp\AMQP\Test;
    
    use Imi\Redis\Redis;
    use Imi\Bean\Annotation\Bean;
    use Imi\AMQP\Annotation\Queue;
    use Imi\AMQP\Base\BaseConsumer;
    use Imi\AMQP\Contract\IMessage;
    use Imi\AMQP\Annotation\Consumer;
    use Imi\AMQP\Annotation\Exchange;
    use Imi\AMQP\Enum\ConsumerResult;
    use Imi\AMQP\Annotation\Connection;
    
    /**
     * 启动一个新连接消费
     * 
     * @Bean("TestConsumer")
     * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest")
     * @Consumer(tag="tag-imi", queue="queue-imi-1", message=\ImiApp\AMQP\Test\TestMessage::class)
     */
    class TestConsumer extends BaseConsumer
    {
        /**
         * 消费任务
         *
         * @param \ImiApp\AMQP\Test\TestMessage $message
         */
        protected function consume(IMessage $message): int
        {
            var_dump(__CLASS__, $message->getBody(), get_class($message));
            Redis::set('imi-amqp:consume:1:' . $message->getMemberId(), $message->getBody());
            return ConsumerResult::ACK;
        }
    
    }
    

    消费消息

    随服务启动的消费进程

    只会启动一个进程,适合量少的场景。适合IO密集型场景。

    首先定义进程:

    <?php
    namespace ImiApp\Process;
    
    use Imi\Swoole\Process\BaseProcess;
    use Imi\Aop\Annotation\Inject;
    use Imi\Swoole\Process\Annotation\Process;
    
    /**
     * @Process(name="TestProcess")
     */
    class TestProcess extends BaseProcess
    {
        /**
         * @Inject("TestConsumer")
         *
         * @var \ImiApp\AMQP\Test\TestConsumer
         */
        protected $testConsumer;
    
        /**
         * @Inject("TestConsumer2")
         *
         * @var \ImiApp\AMQP\Test2\TestConsumer2
         */
        protected $testConsumer2;
    
        public function run(\Swoole\Process $process): void
        {
            // 启动消费者
            go(function(){
                do {
                    $this->testConsumer->run();
                } while(true);
            });
            go(function(){
                do {
                    $this->testConsumer2->run();
                } while(true);
            });
        }
    
    }

    然后在项目配置@app.beans中配置消费进程

    [
        'AutoRunProcessManager' =>  [
            'processes' =>  [
                'TestProcess'
            ],
        ],
    ]
    启动进程池消费

    适合计算密集型场景、消费量非常多的场景。

    进程池写法参考:https://doc.imiphp.com/v2.1/components/process-pool/swoole.html

    启动消费者写法参考上面的即可。

    注解说明

    @Publisher

    发布者注解

    属性名称说明
    queue队列名称
    exchange交换机名称
    routingKey路由键

    @Consumer

    消费者注解

    属性名称说明
    tag消费者标签
    queue队列名称
    exchange交换机名称
    routingKey路由键
    message消息类名,默认:Imi\AMQP\Message
    mandatorymandatory标志位
    immediateimmediate标志位
    ticketticket

    @Queue

    队列注解

    属性名称说明
    name队列名称
    routingKey路由键
    passive被动模式,默认false
    durable消息队列持久化,默认true
    exclusive独占,默认false
    autoDelete自动删除,默认false
    nowait是否非阻塞,默认false
    arguments参数
    ticketticket

    @Exchange

    交换机注解

    属性名称说明
    name交换机名称
    type类型可选:directfanouttopicheaders
    passive被动模式,默认false
    durable消息队列持久化,默认true
    autoDelete自动删除,默认false
    internal设置是否为rabbitmq内部使用, true表示是内部使用, false表示不是内部使用
    nowait是否非阻塞,默认false
    arguments参数
    ticketticket

    @Connection

    连接注解

    属性名称说明
    poolName不为 null 时,无视其他属性,直接用该连接池配置。默认为null,如果hostportuserpassword都未设置,则获取默认的连接池。
    host主机
    port端口
    user用户名
    vhostvhost,默认/
    insistinsist
    loginMethod默认AMQPLAIN
    loginResponseloginResponse
    locale默认en_US
    connectionTimeout连接超时
    readWriteTimeout读写超时
    keepalivekeepalive,默认false
    heartbeat心跳时间,默认0
    channelRpcTimeout频道 RPC 超时时间,默认0.0
    sslProtocolssl 协议,默认null