一、MQTT说明
1.1、mqtt文档
官网:https://mqtt.org/
仅供参考:https://www.emqx.com/zh/mqtt
1.2、MQTT消息服务
MQTT规定了3种消息等级:
1.QoS 0:
消息最多传递一次,不需要客户端给与回复,如果当时客户端不可用,则会丢失该消息。
2.QoS 1:
a、消息传递至少 1 次,发布者会发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内要收到接收者的应答,发布者若没收到应答,会将消息的 DUP 置为 1 并重发消息。
b、所以Qos 1消息级别取决于接受者在规定时间内给与发布者反馈,若没有反馈,则会再次接受到消息。
3.QoS
a、消息仅传送一次,发布者发布 QoS 为 2 的消息之后,会将发布的消息储存起来并等待接收者回复 PUBREC 的消息,发送者收到 PUBREC 消息后,它就可以安全丢弃掉之前的发布消息,因为它已经知道接收者成功收到了消息。
b、发布者会保存 PUBREC 消息并应答一个 PUBREL,等待接收者回复 PUBCOMP 消息,当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。
c、QoS 2 消息的核心是接收者给发布者反馈两次接收结果,相当于一次接收,一次确认接收。
1.3、归纳
- QoS 0 消息只发一次,不在乎是否被别的客户端收到,只要发送了就算结束。
- QoS 1 消息需要消息接收者在规定时间内给予反馈,结束的标志是在发送后规定时间内收到反馈,否则就会一直发送。
- QoS 2 消息需要发送者和接收者双方互相进行消息确认,只要有一方没有确定就不会结束。
二、MQTT环境搭建
有2种方式
1、原生mqtt
2、rabbitmq的mqtt插件
第一种
centos、Ubuntu 安装mqtt和使用https://blog.csdn.net/qq_44413835/article/details/120606097
mqtt客户端下载:
我是使用MQTTBox: https://dl.pconline.com.cn/download/1323304.html
mqttx下载:https://mqttx.app/zh
第二种
安装rabbitmq在开启mqtt插件-好处rabbitmq有web管理平台
注:如果不会使用rabbitmq查看我的消息队列的专栏,里面有集成篇
docker安装rabbitmq: https://blog.csdn.net/qq_44413835/article/details/123648048
进入docker-rabbitmq容器
docker exec -it rabbitmq /bin/bash
安装后开启mqq插件
# 打开rabbitmq_mqtt
rabbitmq-plugins enable rabbitmq_mqtt
# 打开rabbitmq_web_mqtt
rabbitmq-plugins enable rabbitmq_web_mqtt
三、Springboot集成原生mqtt
1.1、项目依赖
<!--集成MQTT-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--开启流支持-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<!--gson序列化工具-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
1.2、application.yml配置
mqtt:
url: tcp://ip:1883
username: guest
password: guest
keep-alive: 30
connection-timeout: 5000
# 发送的主题,#为通配符
sender:
clientId: serverId
defaultTopic: dji-StoA/#,autel-StoA/#
# 默认接受消息的主题
receiver:
clientId: ${random.value}
defaultTopic: dji-AtoS,autel-AtoS,willTopic
deafultTopic : default
1.3、mqtt配置类
MqttConfig
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.jeecg.modules.mqtt.consumer.MqttMsgConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.security.SecureRandom;
@Configuration
@Slf4j
@IntegrationComponentScan
public class MqttConfig {
/**
* 发布的bean名称
* outbound 是发送消息到mqtt broker
* inbound是接收mqtt消息进行处理
*/
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
/**客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱"消息*/
private static final byte[] WILL_DATA= "todoxxxx".getBytes();
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.sender.clientId}")
private String senderClientId;
@Value("${mqtt.sender.defaultTopic}")
private String senderDefaultTopic;
@Value("${mqtt.receiver.clientId}")
private String receiverClientId;
@Value("${mqtt.receiver.defaultTopic}")
private String receiverDefaultTopic;
/**
* MQTT连接器选项
*/
@Bean
public MqttConnectOptions getSenderMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 设置连接的用户名
if (StrUtil.isNotBlank(username)) {
options.setUserName(username);
}
// 设置连接的密码
options.setPassword(password.toCharArray());
// 设置连接的地址
options.setServerURIs(new String[]{url});
// 设置超时时间 单位为秒
options.setConnectionTimeout(15);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
// 但这个方法并没有重连的机制
options.setKeepAliveInterval(30);
// 客户端掉线后 服务器端不会清除session,当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息
options.setCleanSession(false);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
// options.setWill("willTopic", WILL_DATA, 2, false);
return options;
}
/**
* MQTT客户端
*/
@Bean
public MqttPahoClientFactory senderMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getSenderMqttConnectOptions());
return factory;
}
/**
* 发送通道配置 默认主题
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
//clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线
String clientIdStr = receiverClientId + new SecureRandom().nextInt();
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdStr, senderMqttClientFactory());
//async如果为true,则调用方不会阻塞。而是在发送消息时等待传递确认。默认值为false(发送将阻塞,直到确认发送)
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setDefaultTopic(senderDefaultTopic);
messageHandler.setDefaultQos(1);
return messageHandler;
}
/**
* MQTT发送通道
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT接收通道
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT配置监听的 topic 支持通配符
*/
@Bean
public MessageProducer inbound() {
//clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线
String serverIdStr = senderClientId + new SecureRandom().nextInt(10);
String[] receiverTopics = receiverDefaultTopic.split(",");
// 可以同时消费(订阅)多个Topic
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
serverIdStr, senderMqttClientFactory(),
receiverTopics);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT消息处理器(消费者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return new MqttMsgConsumer();
}
/**
* @desc mqtt连接失败或者订阅失败时,触发MqttConnectionFailedEvent事件
* @param event
* @return void
*/
@EventListener(MqttConnectionFailedEvent.class)
public void mqttConnectionFailedEvent(MqttConnectionFailedEvent event) {
log.error("-----mqtt----连接mqtt失败: date={}, hostUrl={}, username={}, error={}",
DateUtil.now(), url, username, event.getCause().getMessage());
}
/**
* @desc 当async和async事件(async-events)都为true时,将发出MqttMessageSentEvent
* 它包含消息、主题、客户端库生成的消息id、clientId和clientInstance(每次连接客户端时递增)
* @param event
* @return void
*/
@EventListener(MqttMessageSentEvent.class)
public void mqttMessageSentEvent(MqttMessageSentEvent event) {
// log.info("-----mqtt----发送信息: date={}, info={}, ", DateUtil.now(), event.toString());
}
/**
* @desc 当async和async事件(async-events)都为true时,将发出MqttMessageDeliveredEvent
* 当客户端库确认传递时,将发出MqttMessageDeliveredEvent。它包含messageId、clientId和clientInstance,使传递与发送相关。
* @param event
* @return void
*/
@EventListener(MqttMessageDeliveredEvent.class)
public void mqttMessageDeliveredEvent(MqttMessageDeliveredEvent event) {
// log.info("-----mqtt----发送成功信息: date={}, info={}", DateUtil.now(), event.toString());
}
/**
* @desc 成功订阅到主题,MqttSubscribedEvent事件就会被触发(多个主题,多次触发)
* @param event
* @return void
*/
@EventListener(MqttSubscribedEvent.class)
public void mqttSubscribedEvent(MqttSubscribedEvent event) {
// log.info("-----mqtt----订阅成功信息: date={}, info={}", DateUtil.now(), event.toString());
}
}
1.4、mqtt消费接口
MqttMsgConsumer
/**
* MQTT接收消息
*/
@Service
@Slf4j
public class MqttMsgConsumer implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = Objects.requireNonNull(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)).toString();
String payload = message.getPayload().toString();
System.err.println("主题:" + topic + " 接收的数据:" + payload);
}
}
1.5、mqtt发布接口
IMqttSender
import org.jeecg.modules.mqtt.config.MqttConfig;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* MQTT发送网关
* 通过接口将数据传递到集成流
*/
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {
/**
* 发送信息到MQTT服务器
*
* @param data 发送的文本
*/
void sendToMqtt(String data);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param qos 对消息处理的几种机制。
* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
1.6、测试
下载mqttx软件进行测试 :https://mqttx.app/