Kafka
目录
介绍
支持在 imi 框架中使用 Kafka 客户端
支持消息发布和消费
本组件基于 龙之言 组织的 longlang/phpkafka 组件,该组件由宇润主导开发。
Composer
本项目可以使用composer安装,遵循psr-4自动加载规则,在你的 composer.json
中加入下面的内容:
{
"require": {
"imiphp/imi-kafka": "~2.1.0"
}
}
然后执行 composer update
安装。
使用说明
可以参考 example
目录示例,包括完整的消息发布和消费功能。
在项目 config/config.php
中配置:
[
'components' => [
// 引入组件
'Kafka' => 'Imi\Kafka',
],
]
连接池配置:(Swoole)
[
'pools' => [
'kafka' => [
'pool' => [
'class' => \Imi\Kafka\Pool\KafkaCoroutinePool::class,
'config' => [
'maxResources' => 10,
'minResources' => 1,
],
],
'resource' => [
'bootstrapServers' => KAFKA_BOOTSTRAP_SERVERS,
'groupId' => 'test',
// 其它配置请参考:https://github.com/longyan/phpkafka/blob/master/doc/consumer.md#%E9%85%8D%E7%BD%AE%E5%8F%82%E6%95%B0
],
],
]
]
连接配置:(Workerman)
'kafka' => [
'connections' => [
'kafka' => [
'bootstrapServers' => KAFKA_BOOTSTRAP_SERVERS,
'groupId' => 'test2',
],
],
],
默认连接池:
[
'beans' => [
'Kafka' => [
'defaultPoolName' => 'kafka',
],
],
]
生产者
use Imi\Kafka\Pool\KafkaPool;
use longlang\phpkafka\Producer\ProduceMessage;
// 获取生产者对象
$producer = KafkaPool::getInstance();
// 发送
$producer->send('主题 Topic', '消息内容');
// send 方法定义
// public function send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null, ?int $brokerId = null): void
// 批量发送
$producer->sendBatch([
new ProduceMessage($topic, 'v1', 'k1'),
new ProduceMessage($topic, 'v2', 'k2'),
]);
// sendBatch 方法定义
// public function sendBatch(ProduceMessage[] $messages, ?int $brokerId = null): void
消费者
消费者类:
<?php
namespace ImiApp\Kafka\Test;
use Imi\Bean\Annotation\Bean;
use Imi\Kafka\Annotation\Consumer;
use Imi\Kafka\Base\BaseConsumer;
use Imi\Redis\Redis;
use longlang\phpkafka\Consumer\ConsumeMessage;
/**
* @Bean("TestConsumer")
* @Consumer(topic="queue-imi-1", groupId="test-consumer")
*/
class TestConsumer extends BaseConsumer
{
/**
* 消费任务
*/
protected function consume(ConsumeMessage $message): void
{
$messageValue = $message->getValue();
}
}
消费进程:
<?php
namespace ImiApp\Process;
use Imi\Aop\Annotation\Inject;
use Imi\App;
use Imi\Kafka\Contract\IConsumer;
use Imi\Process\Annotation\Process;
use Imi\Process\BaseProcess;
/**
* @Process(name="TestProcess")
*/
class TestProcess extends BaseProcess
{
/**
* @Inject("TestConsumer")
*
* @var \ImiApp\Kafka\Test\TestConsumer
*/
protected $testConsumer;
public function run(\Swoole\Process $process): void
{
$this->runConsumer($this->testConsumer);
\Swoole\Coroutine::yield();
}
private function runConsumer(IConsumer $consumer): void
{
go(function () use ($consumer) {
try
{
$consumer->run();
}
catch (\Throwable $th)
{
\Imi\Log\Log::error($th);
sleep(3);
$this->runConsumer($consumer);
}
});
}
}
注解说明
@Consumer
消费者注解
属性名称 | 说明 |
---|---|
topic | 主题名称,支持字符串或字符串数组 |
groupId | 分组ID |
poolName | 连接池名称,不传则使用配置中默认的 |
队列组件支持
本组件额外实现了 imiphp/imi-queue 的接口,可以用 Queue 组件的 API 进行调用。
只需要将队列驱动配置为:KafkaQueueDriver
配置示例:
[
'components' => [
'Kafka' => 'Imi\Kafka',
],
'beans' => [
'AutoRunProcessManager' => [
'processes' => [
// 加入队列消费进程,非必须,你也可以自己写进程消费
'QueueConsumer',
],
],
'imiQueue' => [
// 默认队列
'default' => 'QueueTest1',
// 队列列表
'list' => [
// 队列名称
'QueueTest1' => [
// 使用的队列驱动
'driver' => 'KafkaQueueDriver',
// 消费协程数量
'co' => 1,
// 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
'process' => 1,
// 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
'timespan' => 0.1,
// 进程分组名称
'processGroup' => 'a',
// 自动消费
'autoConsumer' => true,
// 消费者类
'consumer' => 'QueueTestConsumer',
// 驱动类所需要的参数数组
'config' => [
// Kafka 连接池名称
'poolName' => 'kafka',
// 分组ID,如果不传或为null则使用连接池中的配置
'groupId' => 'g1',
],
],
],
],
]
]
消费者类写法,与imi-queue
组件用法一致。