mqtt


一、MQTT说明

1.1、mqtt文档

官网:https://mqtt.org/
仅供参考:https://www.emqx.com/zh/mqtt

1.2、MQTT消息服务

MQTT规定了3种消息等级:

1.QoS 0:

消息最多传递一次,不需要客户端给与回复,如果当时客户端不可用,则会丢失该消息。

2.QoS 1:

a、消息传递至少 1 次,发布者会发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内要收到接收者的应答,发布者若没收到应答,会将消息的 DUP 置为 1 并重发消息
b、所以Qos 1消息级别取决于接受者在规定时间内给与发布者反馈,若没有反馈,则会再次接受到消息。

3.QoS

a、消息仅传送一次,发布者发布 QoS 为 2 的消息之后,会将发布的消息储存起来并等待接收者回复 PUBREC 的消息,发送者收到 PUBREC 消息后,它就可以安全丢弃掉之前的发布消息,因为它已经知道接收者成功收到了消息。
b、发布者会保存 PUBREC 消息并应答一个 PUBREL,等待接收者回复 PUBCOMP 消息,当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。
c、QoS 2 消息的核心是接收者给发布者反馈两次接收结果,相当于一次接收,一次确认接收。

1.3、归纳

  1. QoS 0 消息只发一次,不在乎是否被别的客户端收到,只要发送了就算结束。
  2. QoS 1 消息需要消息接收者在规定时间内给予反馈,结束的标志是在发送后规定时间内收到反馈,否则就会一直发送。
  3. QoS 2 消息需要发送者和接收者双方互相进行消息确认,只要有一方没有确定就不会结束。

二、MQTT环境搭建

有2种方式
1、原生mqtt
2、rabbitmq的mqtt插件

第一种

centos、Ubuntu 安装mqtt和使用https://blog.csdn.net/qq_44413835/article/details/120606097

mqtt客户端下载
我是使用MQTTBox: https://dl.pconline.com.cn/download/1323304.html

mqttx下载:https://mqttx.app/zh

第二种

安装rabbitmq在开启mqtt插件-好处rabbitmq有web管理平台
注:如果不会使用rabbitmq查看我的消息队列的专栏,里面有集成篇

docker安装rabbitmq: https://blog.csdn.net/qq_44413835/article/details/123648048

进入docker-rabbitmq容器

docker exec  -it rabbitmq  /bin/bash

安装后开启mqq插件

# 打开rabbitmq_mqtt
rabbitmq-plugins enable rabbitmq_mqtt

# 打开rabbitmq_web_mqtt
rabbitmq-plugins enable rabbitmq_web_mqtt

三、Springboot集成原生mqtt

1.1、项目依赖

 <!--集成MQTT-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--开启流支持-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>

<!--gson序列化工具-->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

1.2、application.yml配置

mqtt:
  url: tcp://ip:1883
  username: guest
  password: guest
  keep-alive: 30
  connection-timeout: 5000
  # 发送的主题,#为通配符
  sender:
    clientId: serverId
    defaultTopic: dji-StoA/#,autel-StoA/#
  # 默认接受消息的主题
  receiver:
    clientId: ${random.value}
    defaultTopic: dji-AtoS,autel-AtoS,willTopic
  deafultTopic : default

1.3、mqtt配置类

MqttConfig

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.jeecg.modules.mqtt.consumer.MqttMsgConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.security.SecureRandom;

@Configuration
@Slf4j
@IntegrationComponentScan
public class MqttConfig {
    /**
     * 发布的bean名称
     * outbound 是发送消息到mqtt broker
     * inbound是接收mqtt消息进行处理
     */
    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";


    /**客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱"消息*/
    private static final byte[] WILL_DATA= "todoxxxx".getBytes();


    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.url}")
    private String url;

    @Value("${mqtt.sender.clientId}")
    private String senderClientId;

    @Value("${mqtt.sender.defaultTopic}")
    private String senderDefaultTopic;

    @Value("${mqtt.receiver.clientId}")
    private String receiverClientId;

    @Value("${mqtt.receiver.defaultTopic}")
    private String receiverDefaultTopic;


    /**
     * MQTT连接器选项
     */
    @Bean
    public MqttConnectOptions getSenderMqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置连接的用户名
        if (StrUtil.isNotBlank(username)) {
            options.setUserName(username);
        }
        // 设置连接的密码
        options.setPassword(password.toCharArray());
        // 设置连接的地址
        options.setServerURIs(new String[]{url});
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(15);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
        // 但这个方法并没有重连的机制
        options.setKeepAliveInterval(30);
        // 客户端掉线后 服务器端不会清除session,当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息
        options.setCleanSession(false);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//        options.setWill("willTopic", WILL_DATA, 2, false);
        return options;
    }

    /**
     * MQTT客户端
     */
    @Bean
    public MqttPahoClientFactory senderMqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getSenderMqttConnectOptions());
        return factory;
    }


    /**
     * 发送通道配置 默认主题
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
    public MessageHandler mqttOutbound() {

        //clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线
        String clientIdStr  = receiverClientId + new SecureRandom().nextInt();
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdStr, senderMqttClientFactory());
        //async如果为true,则调用方不会阻塞。而是在发送消息时等待传递确认。默认值为false(发送将阻塞,直到确认发送)
        messageHandler.setAsync(true);
        messageHandler.setAsyncEvents(true);
        messageHandler.setDefaultTopic(senderDefaultTopic);
        messageHandler.setDefaultQos(1);
        return messageHandler;
    }



    /**
     * MQTT发送通道
     */
    @Bean(name = CHANNEL_NAME_OUT)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT接收通道
     */
    @Bean(name = CHANNEL_NAME_IN)
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }


    /**
     * MQTT配置监听的 topic 支持通配符
     */
    @Bean
    public MessageProducer inbound() {
        //clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线
        String serverIdStr = senderClientId + new SecureRandom().nextInt(10);

        String[] receiverTopics = receiverDefaultTopic.split(",");
        // 可以同时消费(订阅)多个Topic
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        serverIdStr, senderMqttClientFactory(),
                        receiverTopics);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

    /**
     * MQTT消息处理器(消费者)
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
    public MessageHandler handler() {
        return new MqttMsgConsumer();
    }


    /**
     * @desc mqtt连接失败或者订阅失败时,触发MqttConnectionFailedEvent事件
     * @param event
     * @return void
     */
    @EventListener(MqttConnectionFailedEvent.class)
    public void mqttConnectionFailedEvent(MqttConnectionFailedEvent event) {
        log.error("-----mqtt----连接mqtt失败: date={}, hostUrl={}, username={}, error={}",
                DateUtil.now(), url, username, event.getCause().getMessage());
    }

    /**
     * @desc 当async和async事件(async-events)都为true时,将发出MqttMessageSentEvent
     * 它包含消息、主题、客户端库生成的消息id、clientId和clientInstance(每次连接客户端时递增)
     * @param event
     * @return void
     */
    @EventListener(MqttMessageSentEvent.class)
    public void mqttMessageSentEvent(MqttMessageSentEvent event) {
//        log.info("-----mqtt----发送信息: date={}, info={}, ", DateUtil.now(), event.toString());
    }

    /**
     * @desc 当async和async事件(async-events)都为true时,将发出MqttMessageDeliveredEvent
     * 当客户端库确认传递时,将发出MqttMessageDeliveredEvent。它包含messageId、clientId和clientInstance,使传递与发送相关。
     * @param event
     * @return void
     */
    @EventListener(MqttMessageDeliveredEvent.class)
    public void mqttMessageDeliveredEvent(MqttMessageDeliveredEvent event) {
//        log.info("-----mqtt----发送成功信息: date={}, info={}", DateUtil.now(), event.toString());
    }

    /**
     * @desc 成功订阅到主题,MqttSubscribedEvent事件就会被触发(多个主题,多次触发)
     * @param event
     * @return void
     */
    @EventListener(MqttSubscribedEvent.class)
    public void mqttSubscribedEvent(MqttSubscribedEvent event) {
//        log.info("-----mqtt----订阅成功信息: date={}, info={}", DateUtil.now(), event.toString());
    }
}

1.4、mqtt消费接口

MqttMsgConsumer

/**
 * MQTT接收消息
 */
@Service
@Slf4j
public class MqttMsgConsumer implements MessageHandler {

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        String topic =  Objects.requireNonNull(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)).toString();
        String payload =  message.getPayload().toString();

        System.err.println("主题:" + topic + " 接收的数据:" + payload);
        
    }
}

1.5、mqtt发布接口

IMqttSender

import org.jeecg.modules.mqtt.config.MqttConfig;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * MQTT发送网关
 * 通过接口将数据传递到集成流
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {

    /**
     * 发送信息到MQTT服务器
     *
     * @param data 发送的文本
     */
    void sendToMqtt(String data);

    /**
     * 发送信息到MQTT服务器
     *
     * @param topic   主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 发送信息到MQTT服务器
     *
     * @param topic   主题
     * @param qos     对消息处理的几种机制。
     *                0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
     *                1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
     *                2 多了一次去重的动作,确保订阅者收到的消息有一次。
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

}

1.6、测试

下载mqttx软件进行测试 :https://mqttx.app/


文章作者: wmg
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 wmg !
  目录