AMQP

    目录

    介绍

    支持在 imi 框架中使用 支持 AMQP 协议的消息队列,如:RabbitMQ

    支持消息发布和消费

    Github: https://github.com/imiphp/imi-amqp

    Composer

    本项目可以使用composer安装,遵循psr-4自动加载规则,在你的 composer.json 中加入下面的内容:

    {
        "require": {
            "imiphp/imi-amqp": "~3.0.0"
        }
    }

    然后执行 composer update 安装。

    使用说明

    可以参考 example 目录示例,包括完整的消息发布和消费功能。

    在项目 config/config.php 中配置:

    [
        'components'    =>  [
            // 引入组件
            'AMQP'   =>  'Imi\AMQP',
        ],
    ]

    连接池配置:(Swoole)

    [
        'pools'    =>    [
            'rabbit'    =>  [
                'pool'    =>    [
                    'class'        =>    \Imi\AMQP\Pool\AMQPCoroutinePool::class,
                    'config'    =>    [
                        'maxResources'      => 10,
                        'minResources'      => 1,
                        'heartbeatInterval' => 30, // 连接池心跳时间,推荐设置
                    ],
                ],
                'resource'    =>    [
                    'host'            => '127.0.0.1',
                    'port'            => 5672,
                    'user'            => 'guest',
                    'password'        => 'guest',
                    'keepalive'       => false, // 截止 Swoole 4.8 还有兼容问题,所以必须设为 false,不影响使用
                    'connectionClass' => \PhpAmqpLib\Connection\AMQPStreamConnection::class,
                ]
            ],
        ]
    ]

    连接配置:(Workerman)

    'amqp' => [
        'connections' => [
            'rabbit'    => [
                'host'      => '127.0.0.1',
                'port'      => 5672,
                'user'      => 'guest',
                'password'  => 'guest',
            ],
        ],
    ],

    默认连接池:

    [
        'beans' =>  [
            'AMQP'  =>  [
                'defaultPoolName'   =>  'rabbit',
            ],
        ],
    ]

    连接配置项

    属性名称说明
    host主机
    port端口
    user用户名
    vhostvhost,默认 /
    connectionTimeout连接超时
    readTimeout读超时
    writeTimeout写超时
    channelRpcTimeout频道 RPC 超时时间,默认 0.0
    heartbeat心跳时间。如果不设置的情况,设置了连接池的心跳,就会设置为该值的 2 倍,否则设为0
    keepalivekeepalive,默认 false
    isSecure是否启用加密通信,默认 false
    ioTypeio 类型,默认 stream,可选:streamsocket
    insistinsist
    loginMethod默认 AMQPLAIN
    loginResponseloginResponse
    locale默认 en_US
    amqpProtocolAMQP 协议,默认 0.9.1
    protocolStrictFields是否使用严格的 AMQP 0.9.1 字段类型。RabbitMQ 不支持这个。默认 false
    sendBufferSize发送缓冲区大小,默认 0
    sslCaCertCA 证书内容
    sslCaPathCA 证书地址
    sslCertSSL 证书
    sslKeySSL 证书密钥
    sslVerify是否验证 SSL 证书
    sslVerifyNameSSL 证书验证名称
    sslPassPhraseSSL 证书密码短语
    sslCiphersSSL 密码
    sslSecurityLevelSSL 安全等级
    isLazy是否懒加载,默认 false,不推荐修改
    networkProtocol网络协议,默认 tcp,不推荐修改
    streamContext流上下文,默认 null,不推荐修改
    dispatchSignals无用项,默认 true,不推荐修改
    connectionName连接名称,不推荐修改
    debugPackets输出所有网络数据包以进行调试。,默认 false,不推荐修改

    队列组件支持

    本组件额外实现了 imiphp/imi-queue 的接口,可以用 Queue 组件的 API 进行调用。

    只需要将队列驱动配置为:KafkaQueueDriver

    配置示例:

    [
        'beans' =>  [
            'AutoRunProcessManager' =>  [
                'processes' =>  [
                    // 加入队列消费进程,非必须,你也可以自己写进程消费
                    'QueueConsumer',
                ],
            ],
            'imiQueue'  => [
                // 默认队列
                'default'   => 'QueueTest1',
                // 队列列表
                'list'  => [
                    // 队列名称
                    'QueueTest1' => [
                        // 使用的队列驱动
                        'driver'        => 'AMQPQueueDriver',
                        // 消费协程数量
                        'co'            => 1,
                        // 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
                        'process'       => 1,
                        // 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
                        'timespan'      => 0.1,
                        // 进程分组名称
                        'processGroup'  => 'a',
                        // 自动消费
                        'autoConsumer'  => true,
                        // 消费者类
                        'consumer'      => 'TestConsumer',
                        // 驱动类所需要的参数数组
                        'config'        => [
                            // AMQP 连接池名称
                            'poolName'      => 'rabbit',
                            // Redis 连接池名称
                            'redisPoolName' => 'redis',
                            // Redis 键名前缀
                            'redisPrefix'   => 'QueueTest1:',
                            // 可选配置:
                            // 支持消息删除功能,依赖 Redis
                            'supportDelete' => true,
                            // 支持消费超时队列功能,依赖 Redis,并且自动增加一个队列
                            'supportTimeout' => true,
                            // 支持消费失败队列功能,自动增加一个队列
                            'supportFail' => true,
                            // 循环尝试 pop 的时间间隔,单位:秒
                            'timespan'  => 0.03,
                            // 本地缓存的队列长度。由于 AMQP 不支持主动pop,而是主动推送,所以本地会有缓存队列,这个队列不宜过大。
                            'queueLength'   => 16,
                            // 消息类名
                            'message'   => \Imi\AMQP\Queue\JsonAMQPMessage::class,
                        ],
                    ],
                ],
            ],
        ]
    ]

    消费者类写法,与imi-queue组件用法一致。

    AMQP 消费发布

    这个写法仅 AMQP 有效,其它消息队列不能这么写。

    优点是可以完美利用 AMQP 特性,适合需要个性化定制的用户。

    消息定义

    继承 Imi\AMQP\Message 类,可在构造方法中对属性修改。

    根据需要可以覆盖实现setBodyDatagetBodyData方法,实现自定义的消息结构。

    <?php
    namespace ImiApp\AMQP\Test2;
    
    use Imi\AMQP\Message;
    
    class TestMessage2 extends Message
    {
        /**
         * 用户ID
         *
         * @var int
         */
        private $memberId;
    
        /**
         * 内容
         *
         * @var string
         */
        private $content;
    
        public function __construct()
        {
            parent::__construct();
            $this->routingKey = 'imi-2';
            $this->format = \Imi\Util\Format\Json::class;
        }
    
        /**
         * 设置主体数据
         *
         * @param mixed $data
         * @return self
         */
        public function setBodyData($data)
        {
            foreach($data as $k => $v)
            {
                $this->$k = $v;
            }
        }
    
        /**
         * 获取主体数据
         *
         * @return mixed
         */
        public function getBodyData()
        {
            return [
                'memberId'  =>  $this->memberId,
                'content'   =>  $this->content,
            ];
        }
    
        /**
         * Get 用户ID
         *
         * @return int
         */ 
        public function getMemberId()
        {
            return $this->memberId;
        }
    
        /**
         * Set 用户ID
         *
         * @param int $memberId  用户ID
         *
         * @return self
         */ 
        public function setMemberId(int $memberId)
        {
            $this->memberId = $memberId;
    
            return $this;
        }
    
        /**
         * Get 内容
         *
         * @return string
         */ 
        public function getContent()
        {
            return $this->content;
        }
    
        /**
         * Set 内容
         *
         * @param string $content  内容
         *
         * @return self
         */ 
        public function setContent(string $content)
        {
            $this->content = $content;
    
            return $this;
        }
    
    }

    属性列表:

    名称说明默认值
    bodyData消息主体内容,非字符串null
    properties属性['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,]
    routingKey路由键空字符串
    format如果设置了,发布的消息是编码后的bodyData,同理读取时也会解码。实现了Imi\Util\Format\IFormat的格式化类。支持JsonPhpSerializenull
    mandatorymandatory标志位false
    immediateimmediate标志位false
    ticketticketnull

    发布者定义

    必选注解:Publisher

    可选注解:QueueExchangeConnection

    不配置 Connection 注解,从默认连接池中获取连接

    <?php
    namespace ImiApp\AMQP\Test;
    
    use Imi\Bean\Annotation\Bean;
    use Imi\AMQP\Annotation\Queue;
    use Imi\AMQP\Base\BasePublisher;
    use Imi\AMQP\Annotation\Consumer;
    use Imi\AMQP\Annotation\Exchange;
    use Imi\AMQP\Annotation\Publisher;
    use Imi\AMQP\Annotation\Connection;
    
    #[
        Bean(name: 'TestPublisher'),
        Connection(poolName: null),
        Publisher(tag: 'tag-imi', queue: 'queue-imi-1', exchange: 'exchange-imi', routingKey: 'imi-1'),
        Queue(name: 'queue-imi-1', routingKey: 'imi-1'),
        Exchange(name: 'exchange-imi')
    ]
    class TestPublisher extends BasePublisher
    {
    
    }

    发布消息

    // 实例化构建消息
    $message = new \ImiApp\AMQP\Test2\TestMessage2;
    $message->setMemberId(1);
    $message->setContent('imi niubi');
    
    // 发布消息
    /** @var \ImiApp\AMQP\Test\TestPublisher $testPublisher */
    $testPublisher = \Imi\RequestContext::getBean('TestPublisher');
    // 请勿使用 App::getBean()、Inject 等全局单例注入
    // $testPublisher = App::getBean('TestPublisher');
    $testPublisher->publish($message);

    消费者定义

    必选注解:Consumer

    可选注解:QueueExchangeConnection

    不配置 Connection 注解,从默认连接池中获取连接

    <?php
    namespace ImiApp\AMQP\Test;
    
    use Imi\Redis\Redis;
    use Imi\Bean\Annotation\Bean;
    use Imi\AMQP\Annotation\Queue;
    use Imi\AMQP\Base\BaseConsumer;
    use Imi\AMQP\Contract\IMessage;
    use Imi\AMQP\Annotation\Consumer;
    use Imi\AMQP\Annotation\Exchange;
    use Imi\AMQP\Enum\ConsumerResult;
    use Imi\AMQP\Annotation\Connection;
    
    /**
     * 启动一个新连接消费
     */
    #[
        Bean(name: 'TestConsumer'),
        Connection(poolName: null),
        Consumer(tag: "tag-imi", queue: "queue-imi-1", message: \ImiApp\AMQP\Test\TestMessage::class)
    ]
    class TestConsumer extends BaseConsumer
    {
        /**
         * 消费任务
         *
         * @param \ImiApp\AMQP\Test\TestMessage $message
         */
        protected function consume(IMessage $message): int
        {
            var_dump(__CLASS__, $message->getBody(), get_class($message));
            Redis::set('imi-amqp:consume:1:' . $message->getMemberId(), $message->getBody());
            return ConsumerResult::ACK;
        }
    
    }
    

    消费消息

    随服务启动的消费进程

    只会启动一个进程,适合量少的场景。适合IO密集型场景。

    首先定义进程:

    <?php
    namespace ImiApp\Process;
    
    use Imi\Swoole\Process\BaseProcess;
    use Imi\Aop\Annotation\Inject;
    use Imi\Swoole\Process\Annotation\Process;
    
    #[Process(name: 'TestProcess')]
    class TestProcess extends BaseProcess
    {
        /**
         * @var \ImiApp\AMQP\Test\TestConsumer
         */
        #[Inject(name: 'TestConsumer')]
        protected $testConsumer;
    
        /**
         * @var \ImiApp\AMQP\Test2\TestConsumer2
         */
        #[Inject(name: 'TestConsumer2')]
        protected $testConsumer2;
    
        public function run(\Swoole\Process $process): void
        {
            // 启动消费者
            go(function(){
                do {
                    $this->testConsumer->run();
                } while(true);
            });
            go(function(){
                do {
                    $this->testConsumer2->run();
                } while(true);
            });
        }
    
    }

    然后在项目配置@app.beans中配置消费进程

    [
        'AutoRunProcessManager' =>  [
            'processes' =>  [
                'TestProcess'
            ],
        ],
    ]
    启动进程池消费

    适合计算密集型场景、消费量非常多的场景。

    进程池写法参考:链接

    启动消费者写法参考上面的即可。

    注解说明

    Publisher

    发布者注解

    属性名称说明
    queue队列名称
    exchange交换机名称
    routingKey路由键

    Consumer

    消费者注解

    属性名称说明
    tag消费者标签
    queue队列名称
    exchange交换机名称
    routingKey路由键
    message消息类名,默认:Imi\AMQP\Message
    mandatorymandatory标志位
    immediateimmediate标志位
    ticketticket

    Queue

    队列注解

    属性名称说明
    name队列名称
    routingKey路由键
    passive被动模式,默认false
    durable消息队列持久化,默认true
    exclusive独占,默认false
    autoDelete自动删除,默认false
    nowait是否非阻塞,默认false
    arguments参数
    ticketticket

    Exchange

    交换机注解

    属性名称说明
    name交换机名称
    type类型可选:directfanouttopicheaders
    passive被动模式,默认false
    durable消息队列持久化,默认true
    autoDelete自动删除,默认false
    internal设置是否为rabbitmq内部使用, true表示是内部使用, false表示不是内部使用
    nowait是否非阻塞,默认false
    arguments参数
    ticketticket

    Connection

    连接注解

    属性名称说明
    poolName不为 null 时,无视其他属性,直接用该连接池配置。默认为null,如果hostportuserpassword都未设置,则获取默认的连接池。

    使用示例

    延时消息

    支付宝、微信支付都有一个逻辑,就是用户支付成功后会通过 HTTP 请求来通知我们的接口。

    接口如果没有按照约定的格式返回成功,会定时重试N次,每次延时不同,直到超过一定次数后,就不再重试。

    下面是一个支付通知的消费者示例:

    消费者类:

    <?php
    
    namespace PayService\Module\Pay\AMQP\PayNotify;
    
    use Imi\AMQP\Annotation\Consumer;
    use Imi\AMQP\Annotation\Exchange;
    use Imi\AMQP\Annotation\Queue;
    use Imi\AMQP\Base\BaseConsumer;
    use Imi\AMQP\Contract\IMessage;
    use Imi\AMQP\Enum\ConsumerResult;
    use Imi\Bean\Annotation\AnnotationManager;
    use Imi\Bean\Annotation\Bean;
    use Imi\Bean\BeanFactory;
    use Imi\Log\Log;
    use PhpAmqpLib\Exchange\AMQPExchangeType;
    use PhpAmqpLib\Message\AMQPMessage;
    
    #[
        Bean(name: 'PayNotifyConsumer'),
        Exchange(name: 'exchange-pay-notify'),
        Queue(name: 'pay-notify', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify-dead']),
    
        Exchange(name: 'exchange-pay-notify-step-1'),
        Queue(name: 'pay-notify-step-1', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 15000]),
        Exchange(name: 'exchange-pay-notify-step-2'),
        Queue(name: 'pay-notify-step-2', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 15000]),
        Exchange(name: 'exchange-pay-notify-step-3'),
        Queue(name: 'pay-notify-step-3', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 30000]),
        Exchange(name: 'exchange-pay-notify-step-4'),
        Queue(name: 'pay-notify-step-4', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 180000]),
        Exchange(name: 'exchange-pay-notify-step-5'),
        Queue(name: 'pay-notify-step-5', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 600000]),
        Exchange(name: 'exchange-pay-notify-step-6'),
        Queue(name: 'pay-notify-step-6', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 1200000]),
        Exchange(name: 'exchange-pay-notify-step-7'),
        Queue(name: 'pay-notify-step-7', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 1800000]),
        Exchange(name: 'exchange-pay-notify-step-8'),
        Queue(name: 'pay-notify-step-8', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 1800000]),
        Exchange(name: 'exchange-pay-notify-step-9'),
        Queue(name: 'pay-notify-step-9', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 1800000]),
        Exchange(name: 'exchange-pay-notify-step-10'),
        Queue(name: 'pay-notify-step-10', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 3600000]),
        Exchange(name: 'exchange-pay-notify-step-11'),
        Queue(name: 'pay-notify-step-11', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 10800000]),
        Exchange(name: 'exchange-pay-notify-step-12'),
        Queue(name: 'pay-notify-step-12', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 10800000]),
        Exchange(name: 'exchange-pay-notify-step-13'),
        Queue(name: 'pay-notify-step-13', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 10800000]),
        Exchange(name: 'exchange-pay-notify-step-14'),
        Queue(name: 'pay-notify-step-14', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 21600000]),
        Exchange(name: 'exchange-pay-notify-step-15'),
        Queue(name: 'pay-notify-step-15', arguments: ['x-dead-letter-exchange' => 'exchange-pay-notify', 'x-message-ttl' => 21600000]),
    
        Consumer(tag: 'payNotify', queue: 'pay-notify', exchange: 'exchange-pay-notify', message: \PayService\Module\Pay\AMQP\PayNotify\PayNotifyMessage::class),
    ]
    class PayNotifyConsumer extends BaseConsumer
    {
        /**
         * 定义消费者.
         *
         * @return void
         */
        protected function declareConsumer(): void
        {
            // 定义死信
            $this->channel->exchange_declare('exchange-pay-notify-dead', AMQPExchangeType::DIRECT, false, true, false);
            $this->channel->queue_declare('pay-notify-dead', false, true, false, false);
            $this->channel->queue_bind('pay-notify-dead', 'exchange-pay-notify-dead');
            $this->channel->queue_bind('pay-notify-dead', 'exchange-pay-notify-dead', 'pay-notify');
            $this->channel->queue_bind('pay-notify-dead', 'exchange-pay-notify-dead', 'pay-notify-dead');
            parent::declareConsumer();
        }
    
        /**
         * 消费任务
         *
         * @param \PayService\Module\Pay\AMQP\PayNotify\PayNotifyMessage $message
         *
         * @return void
         */
        protected function consume(IMessage $message)
        {
            try
            {
                // 是否需要重试
                $needRetry = false;
                // HTTP通知逻辑
                // ...
                // $needRetry = true; // 通知失败,需要重试
            }
            // 你也可以定义一个用于重试的异常,在上面逻辑中抛出
            // catch (RetryException $re)
            // {
            //     $needRetry = true;
            // }
            catch (\Throwable $th)
            {
                throw $th;
            }
            finally
            {
                if (isset($th))
                {
                    $result = ConsumerResult::REJECT;
                }
                elseif ($needRetry)
                {
                    $stepCount = \count(AnnotationManager::getClassAnnotations(BeanFactory::getObjectClass($this), Queue::class));
                    if ($message->getRetryCount() < $stepCount - 1)
                    {
                        $newMessage = clone $message;
                        $step = $message->getRetryCount() + 1;
                        $newMessage->setRetryCount($step);
                        $amqpMessage = $newMessage->getAMQPMessage();
                        $amqpMessage->set('delivery_mode', AMQPMessage::DELIVERY_MODE_PERSISTENT);
                        $amqpMessage->setBody($newMessage->getBody());
                        $queueName = 'pay-notify-step-' . $step;
                        $exchangeName = 'exchange-pay-notify-step-' . $step;
                        $this->channel->queue_bind($queueName, $exchangeName);
                        $this->channel->basic_publish($amqpMessage, $exchangeName);
                    }
                    else
                    {
                        $result = ConsumerResult::REJECT;
                    }
                }
            }
    
            return $result ?? ConsumerResult::ACK;
        }
    }

    PayNotifyMessage:

    <?php
    
    namespace PayService\Module\Pay\AMQP\PayNotify;
    
    use Imi\AMQP\Message;
    
    class PayNotifyMessage extends Message
    {
        /**
         * 支付订单ID.
         *
         * @var int
         */
        private $payOrderId;
    
        /**
         * 延时多少秒执行.
         *
         * @var int
         */
        private $retryCount = 0;
    
        public function __construct()
        {
            parent::__construct();
            $this->format = \Imi\Util\Format\Json::class;
        }
    
        /**
         * 设置主体数据.
         *
         * @param mixed $data
         *
         * @return self
         */
        public function setBodyData($data): self
        {
            foreach ($data as $k => $v)
            {
                $this->$k = $v;
            }
    
            return $this;
        }
    
        /**
         * 获取主体数据.
         *
         * @return mixed
         */
        public function getBodyData()
        {
            return [
                'payOrderId'    => $this->payOrderId,
                'retryCount'    => $this->retryCount,
            ];
        }
    
        /**
         * Get 支付订单ID.
         *
         * @return int
         */
        public function getPayOrderId()
        {
            return $this->payOrderId;
        }
    
        /**
         * Set 支付订单ID.
         *
         * @param int $payOrderId 支付订单ID
         *
         * @return self
         */
        public function setPayOrderId(int $payOrderId)
        {
            $this->payOrderId = $payOrderId;
    
            return $this;
        }
    
        /**
         * Get 延时多少秒执行.
         *
         * @return int
         */
        public function getRetryCount()
        {
            return $this->retryCount;
        }
    
        /**
         * Set 延时多少秒执行.
         *
         * @param int $retryCount 延时多少秒执行
         *
         * @return self
         */
        public function setRetryCount(int $retryCount)
        {
            $this->retryCount = $retryCount;
    
            return $this;
        }
    }