AMQP

    目录

    介绍

    支持在 imi 框架中使用 支持 AMQP 协议的消息队列,如:RabbitMQ

    支持消息发布和消费

    Github: https://github.com/imiphp/imi-amqp

    Composer

    本项目可以使用composer安装,遵循psr-4自动加载规则,在你的 composer.json 中加入下面的内容:

    {
        "require": {
            "imiphp/imi-amqp": "~2.1.0"
        }
    }

    然后执行 composer update 安装。

    使用说明

    可以参考 example 目录示例,包括完整的消息发布和消费功能。

    在项目 config/config.php 中配置:

    [
        'components'    =>  [
            // 引入组件
            'AMQP'   =>  'Imi\AMQP',
        ],
    ]

    连接池配置:(Swoole)

    [
        'pools'    =>    [
            'rabbit'    =>  [
                'pool'    =>    [
                    'class'        =>    \Imi\AMQP\Pool\AMQPCoroutinePool::class,
                    'config'    =>    [
                        'maxResources'      => 10,
                        'minResources'      => 1,
                        'heartbeatInterval' => 30, // 连接池心跳时间,推荐设置
                    ],
                ],
                'resource'    =>    [
                    'host'            => '127.0.0.1',
                    'port'            => 5672,
                    'user'            => 'guest',
                    'password'        => 'guest',
                    'keepalive'       => false, // 截止 Swoole 4.8 还有兼容问题,所以必须设为 false,不影响使用
                    'connectionClass' => \PhpAmqpLib\Connection\AMQPStreamConnection::class,
                ]
            ],
        ]
    ]

    连接配置:(Workerman)

    'amqp' => [
        'connections' => [
            'rabbit'    => [
                'host'      => '127.0.0.1',
                'port'      => 5672,
                'user'      => 'guest',
                'password'  => 'guest',
            ],
        ],
    ],

    默认连接池:

    [
        'beans' =>  [
            'AMQP'  =>  [
                'defaultPoolName'   =>  'rabbit',
            ],
        ],
    ]

    队列组件支持

    本组件额外实现了 imiphp/imi-queue 的接口,可以用 Queue 组件的 API 进行调用。

    只需要将队列驱动配置为:KafkaQueueDriver

    配置示例:

    [
        'beans' =>  [
            'AutoRunProcessManager' =>  [
                'processes' =>  [
                    // 加入队列消费进程,非必须,你也可以自己写进程消费
                    'QueueConsumer',
                ],
            ],
            'imiQueue'  => [
                // 默认队列
                'default'   => 'QueueTest1',
                // 队列列表
                'list'  => [
                    // 队列名称
                    'QueueTest1' => [
                        // 使用的队列驱动
                        'driver'        => 'AMQPQueueDriver',
                        // 消费协程数量
                        'co'            => 1,
                        // 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
                        'process'       => 1,
                        // 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
                        'timespan'      => 0.1,
                        // 进程分组名称
                        'processGroup'  => 'a',
                        // 自动消费
                        'autoConsumer'  => true,
                        // 消费者类
                        'consumer'      => 'TestConsumer',
                        // 驱动类所需要的参数数组
                        'config'        => [
                            // AMQP 连接池名称
                            'poolName'      => 'rabbit',
                            // Redis 连接池名称
                            'redisPoolName' => 'redis',
                            // Redis 键名前缀
                            'redisPrefix'   => 'QueueTest1:',
                            // 可选配置:
                            // 支持消息删除功能,依赖 Redis
                            'supportDelete' => true,
                            // 支持消费超时队列功能,依赖 Redis,并且自动增加一个队列
                            'supportTimeout' => true,
                            // 支持消费失败队列功能,自动增加一个队列
                            'supportFail' => true,
                            // 循环尝试 pop 的时间间隔,单位:秒
                            'timespan'  => 0.03,
                            // 本地缓存的队列长度。由于 AMQP 不支持主动pop,而是主动推送,所以本地会有缓存队列,这个队列不宜过大。
                            'queueLength'   => 16,
                            // 消息类名
                            'message'   => \Imi\AMQP\Queue\JsonAMQPMessage::class,
                        ],
                    ],
                ],
            ],
        ]
    ]

    消费者类写法,与imi-queue组件用法一致。

    AMQP 消费发布

    这个写法仅 AMQP 有效,其它消息队列不能这么写。

    优点是可以完美利用 AMQP 特性,适合需要个性化定制的用户。

    连接配置项

    属性名称说明
    host主机
    port端口
    user用户名
    vhostvhost,默认/
    insistinsist
    loginMethod默认AMQPLAIN
    loginResponseloginResponse
    locale默认en_US
    connectionTimeout连接超时
    readWriteTimeout读写超时
    keepalivekeepalive,默认false
    heartbeat心跳时间。如果不设置的情况,设置了连接池的心跳,就会设置为该值的 2 倍,否则设为0
    channelRpcTimeout频道 RPC 超时时间,默认0.0
    sslProtocolssl 协议,默认null

    消息定义

    继承 Imi\AMQP\Message 类,可在构造方法中对属性修改。

    根据需要可以覆盖实现setBodyDatagetBodyData方法,实现自定义的消息结构。

    <?php
    namespace ImiApp\AMQP\Test2;
    
    use Imi\AMQP\Message;
    
    class TestMessage2 extends Message
    {
        /**
         * 用户ID
         *
         * @var int
         */
        private $memberId;
    
        /**
         * 内容
         *
         * @var string
         */
        private $content;
    
        public function __construct()
        {
            parent::__construct();
            $this->routingKey = 'imi-2';
            $this->format = \Imi\Util\Format\Json::class;
        }
    
        /**
         * 设置主体数据
         *
         * @param mixed $data
         * @return self
         */
        public function setBodyData($data)
        {
            foreach($data as $k => $v)
            {
                $this->$k = $v;
            }
        }
    
        /**
         * 获取主体数据
         *
         * @return mixed
         */
        public function getBodyData()
        {
            return [
                'memberId'  =>  $this->memberId,
                'content'   =>  $this->content,
            ];
        }
    
        /**
         * Get 用户ID
         *
         * @return int
         */ 
        public function getMemberId()
        {
            return $this->memberId;
        }
    
        /**
         * Set 用户ID
         *
         * @param int $memberId  用户ID
         *
         * @return self
         */ 
        public function setMemberId(int $memberId)
        {
            $this->memberId = $memberId;
    
            return $this;
        }
    
        /**
         * Get 内容
         *
         * @return string
         */ 
        public function getContent()
        {
            return $this->content;
        }
    
        /**
         * Set 内容
         *
         * @param string $content  内容
         *
         * @return self
         */ 
        public function setContent(string $content)
        {
            $this->content = $content;
    
            return $this;
        }
    
    }

    属性列表:

    名称说明默认值
    bodyData消息主体内容,非字符串null
    properties属性['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,]
    routingKey路由键空字符串
    format如果设置了,发布的消息是编码后的bodyData,同理读取时也会解码。实现了Imi\Util\Format\IFormat的格式化类。支持JsonPhpSerializenull
    mandatorymandatory标志位false
    immediateimmediate标志位false
    ticketticketnull

    发布者定义

    必选注解:@Publisher

    可选注解:@Queue@Exchange@Connection

    不配置 @Connection 注解,可以从连接池中获取连接

    <?php
    namespace ImiApp\AMQP\Test;
    
    use Imi\Bean\Annotation\Bean;
    use Imi\AMQP\Annotation\Queue;
    use Imi\AMQP\Base\BasePublisher;
    use Imi\AMQP\Annotation\Consumer;
    use Imi\AMQP\Annotation\Exchange;
    use Imi\AMQP\Annotation\Publisher;
    use Imi\AMQP\Annotation\Connection;
    
    /**
     * @Bean("TestPublisher")
     * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest")
     * @Publisher(tag="tag-imi", queue="queue-imi-1", exchange="exchange-imi", routingKey="imi-1")
     * @Queue(name="queue-imi-1", routingKey="imi-1")
     * @Exchange(name="exchange-imi")
     */
    class TestPublisher extends BasePublisher
    {
    
    }

    发布消息

    // 实例化构建消息
    $message = new \ImiApp\AMQP\Test2\TestMessage2;
    $message->setMemberId(1);
    $message->setContent('imi niubi');
    
    // 发布消息
    /** @var \ImiApp\AMQP\Test\TestPublisher $testPublisher */
    $testPublisher = \Imi\RequestContext::getBean('TestPublisher');
    // 请勿使用 App::getBean()、@Inject 等全局单例注入
    // $testPublisher = App::getBean('TestPublisher');
    $testPublisher->publish($message);

    消费者定义

    必选注解:@Consumer

    可选注解:@Queue@Exchange@Connection

    不配置 @Connection 注解,可以从连接池中获取连接

    <?php
    namespace ImiApp\AMQP\Test;
    
    use Imi\Redis\Redis;
    use Imi\Bean\Annotation\Bean;
    use Imi\AMQP\Annotation\Queue;
    use Imi\AMQP\Base\BaseConsumer;
    use Imi\AMQP\Contract\IMessage;
    use Imi\AMQP\Annotation\Consumer;
    use Imi\AMQP\Annotation\Exchange;
    use Imi\AMQP\Enum\ConsumerResult;
    use Imi\AMQP\Annotation\Connection;
    
    /**
     * 启动一个新连接消费
     * 
     * @Bean("TestConsumer")
     * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest")
     * @Consumer(tag="tag-imi", queue="queue-imi-1", message=\ImiApp\AMQP\Test\TestMessage::class)
     */
    class TestConsumer extends BaseConsumer
    {
        /**
         * 消费任务
         *
         * @param \ImiApp\AMQP\Test\TestMessage $message
         */
        protected function consume(IMessage $message): int
        {
            var_dump(__CLASS__, $message->getBody(), get_class($message));
            Redis::set('imi-amqp:consume:1:' . $message->getMemberId(), $message->getBody());
            return ConsumerResult::ACK;
        }
    
    }
    

    消费消息

    随服务启动的消费进程

    只会启动一个进程,适合量少的场景。适合IO密集型场景。

    首先定义进程:

    <?php
    namespace ImiApp\Process;
    
    use Imi\Swoole\Process\BaseProcess;
    use Imi\Aop\Annotation\Inject;
    use Imi\Swoole\Process\Annotation\Process;
    
    /**
     * @Process(name="TestProcess")
     */
    class TestProcess extends BaseProcess
    {
        /**
         * @Inject("TestConsumer")
         *
         * @var \ImiApp\AMQP\Test\TestConsumer
         */
        protected $testConsumer;
    
        /**
         * @Inject("TestConsumer2")
         *
         * @var \ImiApp\AMQP\Test2\TestConsumer2
         */
        protected $testConsumer2;
    
        public function run(\Swoole\Process $process): void
        {
            // 启动消费者
            go(function(){
                do {
                    $this->testConsumer->run();
                } while(true);
            });
            go(function(){
                do {
                    $this->testConsumer2->run();
                } while(true);
            });
        }
    
    }

    然后在项目配置@app.beans中配置消费进程

    [
        'AutoRunProcessManager' =>  [
            'processes' =>  [
                'TestProcess'
            ],
        ],
    ]
    启动进程池消费

    适合计算密集型场景、消费量非常多的场景。

    进程池写法参考:链接

    启动消费者写法参考上面的即可。

    注解说明

    @Publisher

    发布者注解

    属性名称说明
    queue队列名称
    exchange交换机名称
    routingKey路由键

    @Consumer

    消费者注解

    属性名称说明
    tag消费者标签
    queue队列名称
    exchange交换机名称
    routingKey路由键
    message消息类名,默认:Imi\AMQP\Message
    mandatorymandatory标志位
    immediateimmediate标志位
    ticketticket

    @Queue

    队列注解

    属性名称说明
    name队列名称
    routingKey路由键
    passive被动模式,默认false
    durable消息队列持久化,默认true
    exclusive独占,默认false
    autoDelete自动删除,默认false
    nowait是否非阻塞,默认false
    arguments参数
    ticketticket

    @Exchange

    交换机注解

    属性名称说明
    name交换机名称
    type类型可选:directfanouttopicheaders
    passive被动模式,默认false
    durable消息队列持久化,默认true
    autoDelete自动删除,默认false
    internal设置是否为rabbitmq内部使用, true表示是内部使用, false表示不是内部使用
    nowait是否非阻塞,默认false
    arguments参数
    ticketticket

    @Connection

    连接注解

    属性名称说明
    poolName不为 null 时,无视其他属性,直接用该连接池配置。默认为null,如果hostportuserpassword都未设置,则获取默认的连接池。
    host主机
    port端口
    user用户名
    vhostvhost,默认/
    insistinsist
    loginMethod默认AMQPLAIN
    loginResponseloginResponse
    locale默认en_US
    connectionTimeout连接超时
    readWriteTimeout读写超时
    keepalivekeepalive,默认false
    heartbeat心跳时间,默认0
    channelRpcTimeout频道 RPC 超时时间,默认0.0
    sslProtocolssl 协议,默认null

    使用示例

    延时消息

    支付宝、微信支付都有一个逻辑,就是用户支付成功后会通过 HTTP 请求来通知我们的接口。

    接口如果没有按照约定的格式返回成功,会定时重试N次,每次延时不同,直到超过一定次数后,就不再重试。

    下面是一个支付通知的消费者示例:

    消费者类:

    <?php
    
    namespace PayService\Module\Pay\AMQP\PayNotify;
    
    use Imi\AMQP\Annotation\Consumer;
    use Imi\AMQP\Annotation\Exchange;
    use Imi\AMQP\Annotation\Queue;
    use Imi\AMQP\Base\BaseConsumer;
    use Imi\AMQP\Contract\IMessage;
    use Imi\AMQP\Enum\ConsumerResult;
    use Imi\Bean\Annotation\AnnotationManager;
    use Imi\Bean\Annotation\Bean;
    use Imi\Bean\BeanFactory;
    use Imi\Log\Log;
    use PhpAmqpLib\Exchange\AMQPExchangeType;
    use PhpAmqpLib\Message\AMQPMessage;
    
    /**
     * @Bean("PayNotifyConsumer")
     *
     * @Exchange(name="exchange-pay-notify")
     *
     * @Queue(name="pay-notify", arguments={"x-dead-letter-exchange"="exchange-pay-notify-dead"})
     *
     * @Exchange(name="exchange-pay-notify-step-1")
     *
     * @Queue(name="pay-notify-step-1", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=15000})
     *
     * @Exchange(name="exchange-pay-notify-step-2")
     *
     * @Queue(name="pay-notify-step-2", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=15000})
     *
     * @Exchange(name="exchange-pay-notify-step-3")
     *
     * @Queue(name="pay-notify-step-3", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=30000})
     *
     * @Exchange(name="exchange-pay-notify-step-4")
     *
     * @Queue(name="pay-notify-step-4", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=180000})
     *
     * @Exchange(name="exchange-pay-notify-step-5")
     *
     * @Queue(name="pay-notify-step-5", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=600000})
     *
     * @Exchange(name="exchange-pay-notify-step-6")
     *
     * @Queue(name="pay-notify-step-6", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=1200000})
     *
     * @Exchange(name="exchange-pay-notify-step-7")
     *
     * @Queue(name="pay-notify-step-7", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=1800000})
     *
     * @Exchange(name="exchange-pay-notify-step-8")
     *
     * @Queue(name="pay-notify-step-8", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=1800000})
     *
     * @Exchange(name="exchange-pay-notify-step-9")
     *
     * @Queue(name="pay-notify-step-9", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=1800000})
     *
     * @Exchange(name="exchange-pay-notify-step-10")
     *
     * @Queue(name="pay-notify-step-10", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=3600000})
     *
     * @Exchange(name="exchange-pay-notify-step-11")
     *
     * @Queue(name="pay-notify-step-11", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=10800000})
     *
     * @Exchange(name="exchange-pay-notify-step-12")
     *
     * @Queue(name="pay-notify-step-12", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=10800000})
     *
     * @Exchange(name="exchange-pay-notify-step-13")
     *
     * @Queue(name="pay-notify-step-13", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=10800000})
     *
     * @Exchange(name="exchange-pay-notify-step-14")
     *
     * @Queue(name="pay-notify-step-14", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=21600000})
     *
     * @Exchange(name="exchange-pay-notify-step-15")
     *
     * @Queue(name="pay-notify-step-15", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=21600000})
     *
     * @Consumer(tag="payNotify", queue="pay-notify", exchange="exchange-pay-notify", message=\PayService\Module\Pay\AMQP\PayNotify\PayNotifyMessage::class)
     */
    class PayNotifyConsumer extends BaseConsumer
    {
        /**
         * 定义消费者.
         *
         * @return void
         */
        protected function declareConsumer(): void
        {
            // 定义死信
            $this->channel->exchange_declare('exchange-pay-notify-dead', AMQPExchangeType::DIRECT, false, true, false);
            $this->channel->queue_declare('pay-notify-dead', false, true, false, false);
            $this->channel->queue_bind('pay-notify-dead', 'exchange-pay-notify-dead');
            $this->channel->queue_bind('pay-notify-dead', 'exchange-pay-notify-dead', 'pay-notify');
            $this->channel->queue_bind('pay-notify-dead', 'exchange-pay-notify-dead', 'pay-notify-dead');
            parent::declareConsumer();
        }
    
        /**
         * 消费任务
         *
         * @param \PayService\Module\Pay\AMQP\PayNotify\PayNotifyMessage $message
         *
         * @return void
         */
        protected function consume(IMessage $message)
        {
            try
            {
                // 是否需要重试
                $needRetry = false;
                // HTTP通知逻辑
                // ...
                // $needRetry = true; // 通知失败,需要重试
            }
            // 你也可以定义一个用于重试的异常,在上面逻辑中抛出
            // catch (RetryException $re)
            // {
            //     $needRetry = true;
            // }
            catch (\Throwable $th)
            {
                throw $th;
            }
            finally
            {
                if (isset($th))
                {
                    $result = ConsumerResult::REJECT;
                }
                elseif ($needRetry)
                {
                    $stepCount = \count(AnnotationManager::getClassAnnotations(BeanFactory::getObjectClass($this), Queue::class));
                    if ($message->getRetryCount() < $stepCount - 1)
                    {
                        $newMessage = clone $message;
                        $step = $message->getRetryCount() + 1;
                        $newMessage->setRetryCount($step);
                        $amqpMessage = $newMessage->getAMQPMessage();
                        $amqpMessage->set('delivery_mode', AMQPMessage::DELIVERY_MODE_PERSISTENT);
                        $amqpMessage->setBody($newMessage->getBody());
                        $queueName = 'pay-notify-step-' . $step;
                        $exchangeName = 'exchange-pay-notify-step-' . $step;
                        $this->channel->queue_bind($queueName, $exchangeName);
                        $this->channel->basic_publish($amqpMessage, $exchangeName);
                    }
                    else
                    {
                        $result = ConsumerResult::REJECT;
                    }
                }
            }
    
            return $result ?? ConsumerResult::ACK;
        }
    }

    PayNotifyMessage:

    <?php
    
    namespace PayService\Module\Pay\AMQP\PayNotify;
    
    use Imi\AMQP\Message;
    
    class PayNotifyMessage extends Message
    {
        /**
         * 支付订单ID.
         *
         * @var int
         */
        private $payOrderId;
    
        /**
         * 延时多少秒执行.
         *
         * @var int
         */
        private $retryCount = 0;
    
        public function __construct()
        {
            parent::__construct();
            $this->format = \Imi\Util\Format\Json::class;
        }
    
        /**
         * 设置主体数据.
         *
         * @param mixed $data
         *
         * @return self
         */
        public function setBodyData($data): self
        {
            foreach ($data as $k => $v)
            {
                $this->$k = $v;
            }
    
            return $this;
        }
    
        /**
         * 获取主体数据.
         *
         * @return mixed
         */
        public function getBodyData()
        {
            return [
                'payOrderId'    => $this->payOrderId,
                'retryCount'    => $this->retryCount,
            ];
        }
    
        /**
         * Get 支付订单ID.
         *
         * @return int
         */
        public function getPayOrderId()
        {
            return $this->payOrderId;
        }
    
        /**
         * Set 支付订单ID.
         *
         * @param int $payOrderId 支付订单ID
         *
         * @return self
         */
        public function setPayOrderId(int $payOrderId)
        {
            $this->payOrderId = $payOrderId;
    
            return $this;
        }
    
        /**
         * Get 延时多少秒执行.
         *
         * @return int
         */
        public function getRetryCount()
        {
            return $this->retryCount;
        }
    
        /**
         * Set 延时多少秒执行.
         *
         * @param int $retryCount 延时多少秒执行
         *
         * @return self
         */
        public function setRetryCount(int $retryCount)
        {
            $this->retryCount = $retryCount;
    
            return $this;
        }
    }