MQTT 服务端
目录
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
仅支持 Swoole
安装:composer require imiphp/imi-mqtt
示例项目:composer create-project imiphp/project-mqtt:~2.1.0
配置
首先,服务器配置的type
设为MQTT
,并且定义好控制器。
控制器需要继承Imi\Server\MQTT\BaseMQTTController
类,并且实现方法。
如果你是主服务器,配置如下:
'mainServer' => [
'namespace' => 'ImiApp\MQTTServer',
'type' => 'MQTTServer',
'host' => '127.0.0.1',
'port' => 8081,
'controller' => \ImiApp\MQTTServer\Controller\MQTTController::class,
// 'configs' => [
// // 启用 MQTTS 配置证书
// 'ssl_cert_file' => dirname(__DIR__) . '/ssl/server.crt',
// 'ssl_key_file' => dirname(__DIR__) . '/ssl/server.key',
// ],
],
如果你是子服务器,配置如下下:
// 子服务器(端口监听)配置
'subServers' => [
'xxxServer' => [
'namespace' => 'ImiApp\MQTTServer',
'type' => 'MQTTServer',
'host' => '127.0.0.1',
'port' => 8081,
'controller' => \ImiApp\MQTTServer\Controller\MQTTController::class,
// 'configs' => [
// // 启用 MQTTS 配置证书
// 'ssl_cert_file' => dirname(__DIR__) . '/ssl/server.crt',
// 'ssl_key_file' => dirname(__DIR__) . '/ssl/server.key',
// ],
],
],
通讯数据包类
imi-mqtt
基于 binsoul/net-mqtt
开发,使用的都是这个包中的数据包结构类。
类名一般是BinSoul\Net\Mqtt\Packet\XXX
如:\BinSoul\Net\Mqtt\Packet\SubscribeRequestPacket
控制器
<?php
namespace ImiApp\MQTTServer\Controller;
use Imi\Server\MQTT\BaseMQTTController;
use Imi\Server\MQTT\Message\ReceiveData;
use BinSoul\Net\Mqtt\Packet\PublishAckPacket;
use BinSoul\Net\Mqtt\Packet\PingRequestPacket;
use BinSoul\Net\Mqtt\Packet\PingResponsePacket;
use BinSoul\Net\Mqtt\Packet\ConnectRequestPacket;
use BinSoul\Net\Mqtt\Packet\PublishReleasePacket;
use BinSoul\Net\Mqtt\Packet\PublishRequestPacket;
use BinSoul\Net\Mqtt\Packet\ConnectResponsePacket;
use BinSoul\Net\Mqtt\Packet\PublishCompletePacket;
use BinSoul\Net\Mqtt\Packet\PublishReceivedPacket;
use BinSoul\Net\Mqtt\Packet\SubscribeRequestPacket;
use BinSoul\Net\Mqtt\Packet\DisconnectRequestPacket;
use BinSoul\Net\Mqtt\Packet\SubscribeResponsePacket;
use BinSoul\Net\Mqtt\Packet\UnsubscribeRequestPacket;
use BinSoul\Net\Mqtt\Packet\UnsubscribeResponsePacket;
use Imi\Server\Server;
/**
* 示例控制器,代码全为示例,请根据实际项目编写
*/
class MQTTController extends BaseMQTTController
{
/**
* 连接
*
* @param \BinSoul\Net\Mqtt\Packet\ConnectRequestPacket $request
* @param \Imi\Server\MQTT\Message\ReceiveData $receiveData
* @return \BinSoul\Net\Mqtt\Packet\ConnectResponsePacket|null
*/
public function connect(ConnectRequestPacket $request, ReceiveData $receiveData): ?ConnectResponsePacket
{
$success = 'root' === $request->getUsername() && '123456' === $request->getPassword();
$response = new ConnectResponsePacket;
if($success)
{
$response->setReturnCode(0);
}
else
{
$response->setReturnCode(4);
}
return $response;
}
/**
* 断开连接
*
* @param \BinSoul\Net\Mqtt\Packet\DisconnectRequestPacket $request
* @param \Imi\Server\MQTT\Message\ReceiveData $receiveData
* @return void
*/
public function disconnect(DisconnectRequestPacket $request, ReceiveData $receiveData): void
{
}
/**
* Ping
*
* @param \BinSoul\Net\Mqtt\Packet\PingRequestPacket $request
* @param \Imi\Server\MQTT\Message\ReceiveData $receiveData
* @return \BinSoul\Net\Mqtt\Packet\PingResponsePacket|null
*/
public function ping(PingRequestPacket $request, ReceiveData $receiveData): ?PingResponsePacket
{
return new PingResponsePacket;
}
/**
* 发布
*
* @param \BinSoul\Net\Mqtt\Packet\PublishRequestPacket $request
* @param \Imi\Server\MQTT\Message\ReceiveData $receiveData
* @return \BinSoul\Net\Mqtt\Packet\PublishAckPacket|\BinSoul\Net\Mqtt\Packet\PublishReceivedPacket|\BinSoul\Net\Mqtt\Packet\PublishReleasePacket|\BinSoul\Net\Mqtt\Packet\PublishCompletePacket|null
*/
public function publish(PublishRequestPacket $request, ReceiveData $receiveData)
{
switch($request->getTopic())
{
case 'a':
$response = new PublishAckPacket;
break;
case 'b':
$response = new PublishReceivedPacket;
break;
case 'c':
$response = new PublishReleasePacket;
break;
case 'd':
$response = new PublishCompletePacket;
break;
default:
throw new \RuntimeException('Unknown topic ' . $request->getTopic());
}
$response->setIdentifier($request->getIdentifier());
// TODO: 请自行实现推送消息给订阅者
return $response;
}
/**
* 订阅
*
* @param \BinSoul\Net\Mqtt\Packet\SubscribeRequestPacket $request
* @param \Imi\Server\MQTT\Message\ReceiveData $receiveData
* @return \BinSoul\Net\Mqtt\Packet\SubscribeResponsePacket|null
*/
public function subscribe(SubscribeRequestPacket $request, ReceiveData $receiveData): ?SubscribeResponsePacket
{
$response = new SubscribeResponsePacket;
$response->setIdentifier($request->getIdentifier());
$response->setReturnCodes([0]);
$publishData = new PublishRequestPacket;
$publishData->setPayload('test');
$publishData->setTopic('a');
Server::send($publishData, $receiveData->getClientId());
// TODO: 请自行实现订阅消息
return $response;
}
/**
* 取消订阅
*
* @param \BinSoul\Net\Mqtt\Packet\UnsubscribeRequestPacket $request
* @param \Imi\Server\MQTT\Message\ReceiveData $receiveData
* @return \BinSoul\Net\Mqtt\Packet\UnsubscribeResponsePacket|null
*/
public function unsubscribe(UnsubscribeRequestPacket $request, ReceiveData $receiveData): ?UnsubscribeResponsePacket
{
$response = new UnsubscribeResponsePacket;
$response->setIdentifier($request->getIdentifier());
// TODO: 请自行实现取消订阅消息
return $response;
}
}