Kafka

    目录

    介绍

    支持在 imi 框架中使用 Kafka 客户端

    支持消息发布和消费

    本组件基于 龙之言 组织的 longlang/phpkafka 组件,该组件由宇润主导开发。

    Composer

    本项目可以使用composer安装,遵循psr-4自动加载规则,在你的 composer.json 中加入下面的内容:

    {
        "require": {
            "imiphp/imi-kafka": "~2.1.0"
        }
    }

    然后执行 composer update 安装。

    使用说明

    可以参考 example 目录示例,包括完整的消息发布和消费功能。

    在项目 config/config.php 中配置:

    [
        'components'    =>  [
            // 引入组件
            'Kafka'   =>  'Imi\Kafka',
        ],
    ]

    连接池配置:(Swoole)

    [
        'pools'    =>    [
            'kafka'    => [
                'pool'    => [
                    'class'        => \Imi\Kafka\Pool\KafkaCoroutinePool::class,
                    'config'       => [
                        'maxResources'    => 10,
                        'minResources'    => 1,
                    ],
                ],
                'resource'    => [
                    'bootstrapServers' => KAFKA_BOOTSTRAP_SERVERS,
                    'groupId'          => 'test',
                    // 其它配置请参考:https://github.com/longyan/phpkafka/blob/master/doc/consumer.md#%E9%85%8D%E7%BD%AE%E5%8F%82%E6%95%B0
                ],
            ],
        ]
    ]

    连接配置:(Workerman)

    'kafka' => [
        'connections' => [
            'kafka'    => [
                'bootstrapServers' => KAFKA_BOOTSTRAP_SERVERS,
                'groupId'          => 'test2',
            ],
        ],
    ],

    默认连接池:

    [
        'beans' =>  [
            'Kafka'  =>  [
                'defaultPoolName'   =>  'kafka',
            ],
        ],
    ]

    生产者

    use Imi\Kafka\Pool\KafkaPool;
    use longlang\phpkafka\Producer\ProduceMessage;
    
    // 获取生产者对象
    $producer = KafkaPool::getInstance();
    
    // 发送
    $producer->send('主题 Topic', '消息内容');
    // send 方法定义
    // public function send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null, ?int $brokerId = null): void
    
    // 批量发送
    $producer->sendBatch([
        new ProduceMessage($topic, 'v1', 'k1'),
        new ProduceMessage($topic, 'v2', 'k2'),
    ]);
    // sendBatch 方法定义
    // public function sendBatch(ProduceMessage[] $messages, ?int $brokerId = null): void

    消费者

    消费者类:

    <?php
    
    namespace ImiApp\Kafka\Test;
    
    use Imi\Bean\Annotation\Bean;
    use Imi\Kafka\Annotation\Consumer;
    use Imi\Kafka\Base\BaseConsumer;
    use Imi\Redis\Redis;
    use longlang\phpkafka\Consumer\ConsumeMessage;
    
    /**
     * @Bean("TestConsumer")
     * @Consumer(topic="queue-imi-1", groupId="test-consumer")
     */
    class TestConsumer extends BaseConsumer
    {
        /**
         * 消费任务
         */
        protected function consume(ConsumeMessage $message): void
        {
            $messageValue = $message->getValue();
        }
    }

    消费进程:

    <?php
    
    namespace ImiApp\Process;
    
    use Imi\Aop\Annotation\Inject;
    use Imi\App;
    use Imi\Kafka\Contract\IConsumer;
    use Imi\Process\Annotation\Process;
    use Imi\Process\BaseProcess;
    
    /**
     * @Process(name="TestProcess")
     */
    class TestProcess extends BaseProcess
    {
        /**
         * @Inject("TestConsumer")
         *
         * @var \ImiApp\Kafka\Test\TestConsumer
         */
        protected $testConsumer;
    
        public function run(\Swoole\Process $process): void
        {
            $this->runConsumer($this->testConsumer);
            \Swoole\Coroutine::yield();
        }
    
        private function runConsumer(IConsumer $consumer): void
        {
            go(function () use ($consumer) {
                try
                {
                    $consumer->run();
                }
                catch (\Throwable $th)
                {
                    \Imi\Log\Log::error($th);
                    sleep(3);
                    $this->runConsumer($consumer);
                }
            });
        }
    }

    注解说明

    @Consumer

    消费者注解

    属性名称说明
    topic主题名称,支持字符串或字符串数组
    groupId分组ID
    poolName连接池名称,不传则使用配置中默认的

    队列组件支持

    本组件额外实现了 imiphp/imi-queue 的接口,可以用 Queue 组件的 API 进行调用。

    只需要将队列驱动配置为:KafkaQueueDriver

    配置示例:

    [
        'components'    =>  [
            'Kafka'  =>  'Imi\Kafka',
        ],
        'beans' =>  [
            'AutoRunProcessManager' =>  [
                'processes' =>  [
                    // 加入队列消费进程,非必须,你也可以自己写进程消费
                    'QueueConsumer',
                ],
            ],
            'imiQueue'  => [
                // 默认队列
                'default'   => 'QueueTest1',
                // 队列列表
                'list'  => [
                    // 队列名称
                    'QueueTest1' => [
                        // 使用的队列驱动
                        'driver'        => 'KafkaQueueDriver',
                        // 消费协程数量
                        'co'            => 1,
                        // 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
                        'process'       => 1,
                        // 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
                        'timespan'      => 0.1,
                        // 进程分组名称
                        'processGroup'  => 'a',
                        // 自动消费
                        'autoConsumer'  => true,
                        // 消费者类
                        'consumer'      => 'QueueTestConsumer',
                        // 驱动类所需要的参数数组
                        'config'        => [
                            // Kafka 连接池名称
                            'poolName' => 'kafka',
                            // 分组ID,如果不传或为null则使用连接池中的配置
                            'groupId'  => 'g1',
                        ],
                    ],
                ],
            ],
        ]
    ]

    消费者类写法,与imi-queue组件用法一致。