MQTT 服务端

    目录

    MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

    仅支持 Swoole

    安装:composer require imiphp/imi-mqtt

    示例项目:composer create-project imiphp/project-mqtt:~2.1.0

    配置

    首先,服务器配置的type设为MQTT,并且定义好控制器。

    控制器需要继承Imi\Server\MQTT\BaseMQTTController类,并且实现方法。

    如果你是主服务器,配置如下:

    'mainServer'    =>    [
        'namespace'     =>    'ImiApp\MQTTServer',
        'type'          =>    'MQTTServer',
        'host'          =>    '127.0.0.1',
        'port'          =>    8081,
        'controller'    =>  \ImiApp\MQTTServer\Controller\MQTTController::class,
        // 'configs'       =>    [
        //     // 启用 MQTTS 配置证书
        //     'ssl_cert_file'     =>  dirname(__DIR__) . '/ssl/server.crt',
        //     'ssl_key_file'      =>  dirname(__DIR__) . '/ssl/server.key',
        // ],
    ],

    如果你是子服务器,配置如下下:

    // 子服务器(端口监听)配置
    'subServers'        =>    [
        'xxxServer'    =>    [
            'namespace'     =>    'ImiApp\MQTTServer',
            'type'          =>    'MQTTServer',
            'host'          =>    '127.0.0.1',
            'port'          =>    8081,
            'controller'    =>  \ImiApp\MQTTServer\Controller\MQTTController::class,
            // 'configs'       =>    [
            //     // 启用 MQTTS 配置证书
            //     'ssl_cert_file'     =>  dirname(__DIR__) . '/ssl/server.crt',
            //     'ssl_key_file'      =>  dirname(__DIR__) . '/ssl/server.key',
            // ],
        ],
    ],

    通讯数据包类

    imi-mqtt 基于 binsoul/net-mqtt 开发,使用的都是这个包中的数据包结构类。

    类名一般是BinSoul\Net\Mqtt\Packet\XXX

    如:\BinSoul\Net\Mqtt\Packet\SubscribeRequestPacket

    控制器

    <?php
    namespace ImiApp\MQTTServer\Controller;
    
    use Imi\Server\MQTT\BaseMQTTController;
    use Imi\Server\MQTT\Message\ReceiveData;
    use BinSoul\Net\Mqtt\Packet\PublishAckPacket;
    use BinSoul\Net\Mqtt\Packet\PingRequestPacket;
    use BinSoul\Net\Mqtt\Packet\PingResponsePacket;
    use BinSoul\Net\Mqtt\Packet\ConnectRequestPacket;
    use BinSoul\Net\Mqtt\Packet\PublishReleasePacket;
    use BinSoul\Net\Mqtt\Packet\PublishRequestPacket;
    use BinSoul\Net\Mqtt\Packet\ConnectResponsePacket;
    use BinSoul\Net\Mqtt\Packet\PublishCompletePacket;
    use BinSoul\Net\Mqtt\Packet\PublishReceivedPacket;
    use BinSoul\Net\Mqtt\Packet\SubscribeRequestPacket;
    use BinSoul\Net\Mqtt\Packet\DisconnectRequestPacket;
    use BinSoul\Net\Mqtt\Packet\SubscribeResponsePacket;
    use BinSoul\Net\Mqtt\Packet\UnsubscribeRequestPacket;
    use BinSoul\Net\Mqtt\Packet\UnsubscribeResponsePacket;
    use Imi\Server\Server;
    
    /**
     * 示例控制器,代码全为示例,请根据实际项目编写
     */
    class MQTTController extends BaseMQTTController
    {
        /**
         * 连接
         *
         * @param \BinSoul\Net\Mqtt\Packet\ConnectRequestPacket $request
         * @param \Imi\Server\MQTT\Message\ReceiveData $receiveData
         * @return \BinSoul\Net\Mqtt\Packet\ConnectResponsePacket|null
         */
        public function connect(ConnectRequestPacket $request, ReceiveData $receiveData): ?ConnectResponsePacket
        {
            $success = 'root' === $request->getUsername() && '123456' === $request->getPassword();
            $response = new ConnectResponsePacket;
            if($success)
            {
                $response->setReturnCode(0);
            }
            else
            {
                $response->setReturnCode(4);
            }
            return $response;
        }
    
        /**
         * 断开连接
         *
         * @param \BinSoul\Net\Mqtt\Packet\DisconnectRequestPacket $request
         * @param \Imi\Server\MQTT\Message\ReceiveData $receiveData
         * @return void
         */
        public function disconnect(DisconnectRequestPacket $request, ReceiveData $receiveData): void
        {
            
        }
    
        /**
         * Ping
         *
         * @param \BinSoul\Net\Mqtt\Packet\PingRequestPacket $request
         * @param \Imi\Server\MQTT\Message\ReceiveData $receiveData
         * @return \BinSoul\Net\Mqtt\Packet\PingResponsePacket|null
         */
        public function ping(PingRequestPacket $request, ReceiveData $receiveData): ?PingResponsePacket
        {
            return new PingResponsePacket;
        }
    
        /**
         * 发布
         *
         * @param \BinSoul\Net\Mqtt\Packet\PublishRequestPacket $request
         * @param \Imi\Server\MQTT\Message\ReceiveData $receiveData
         * @return \BinSoul\Net\Mqtt\Packet\PublishAckPacket|\BinSoul\Net\Mqtt\Packet\PublishReceivedPacket|\BinSoul\Net\Mqtt\Packet\PublishReleasePacket|\BinSoul\Net\Mqtt\Packet\PublishCompletePacket|null
         */
        public function publish(PublishRequestPacket $request, ReceiveData $receiveData)
        {
            switch($request->getTopic())
            {
                case 'a':
                    $response = new PublishAckPacket;
                    break;
                case 'b':
                    $response = new PublishReceivedPacket;
                    break;
                case 'c':
                    $response = new PublishReleasePacket;
                    break;
                case 'd':
                    $response = new PublishCompletePacket;
                    break;
                default:
                    throw new \RuntimeException('Unknown topic ' . $request->getTopic());
            }
            $response->setIdentifier($request->getIdentifier());
            // TODO: 请自行实现推送消息给订阅者
            return $response;
        }
    
        /**
         * 订阅
         *
         * @param \BinSoul\Net\Mqtt\Packet\SubscribeRequestPacket $request
         * @param \Imi\Server\MQTT\Message\ReceiveData $receiveData
         * @return \BinSoul\Net\Mqtt\Packet\SubscribeResponsePacket|null
         */
        public function subscribe(SubscribeRequestPacket $request, ReceiveData $receiveData): ?SubscribeResponsePacket
        {
            $response = new SubscribeResponsePacket;
            $response->setIdentifier($request->getIdentifier());
            $response->setReturnCodes([0]);
    
            $publishData = new PublishRequestPacket;
            $publishData->setPayload('test');
            $publishData->setTopic('a');
            Server::send($publishData, $receiveData->getClientId());
    
            // TODO: 请自行实现订阅消息
            return $response;
        }
    
        /**
         * 取消订阅
         *
         * @param \BinSoul\Net\Mqtt\Packet\UnsubscribeRequestPacket $request
         * @param \Imi\Server\MQTT\Message\ReceiveData $receiveData
         * @return \BinSoul\Net\Mqtt\Packet\UnsubscribeResponsePacket|null
         */
        public function unsubscribe(UnsubscribeRequestPacket $request, ReceiveData $receiveData): ?UnsubscribeResponsePacket
        {
            $response = new UnsubscribeResponsePacket;
            $response->setIdentifier($request->getIdentifier());
            // TODO: 请自行实现取消订阅消息
    
            return $response;
        }
    
    }