MQTT
(消息队列遥测传输协议)是一种轻量级的消息通信协议,主要用于在设备之间进行低带宽和高延迟的传输。MQTT
基于发布/订阅(Publish/Subscribe
)的模式,由 IBM 开发,现已成为物联网(IoT
)领域的标准协议。
MQTT
的主要特点如下:
QoS
(服务质量)等级保证消息传输的可靠性。SSL/TLS
加密确保数据安全。MQTT
广泛用于物联网、远程监控、消息推送等应用场景,成为设备间重要的通信手段。以下是若依前后端分离版项目集成MQTT
协议的详细步骤:
MQTT X
工具首先,前往 EMQX 官网 下载 MQTT X
,并完成安装。
打开MQTT X
,创建一个新的连接,设置连接名称,然后点击“连接”按钮。
ruoyi-common
模块的pom.xml
中添加依赖<!-- MQTT 相关依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
application.yml
文件中添加配置在Spring
配置中添加以下MQTT
相关配置:
# MQTT配置
mqtt:
username: admin # 用户名
password: admin # 密码
hostUrl: tcp://broker.emqx.io:1883 # 服务器地址
clientId: clientId # 客户端ID
defaultTopic: topic,topic1 # 订阅主题
timeout: 100 # 超时时间
keepalive: 60 # 心跳间隔
enabled: true # 是否启用MQTT功能
mqtt
相关工具类在 ruoyi-common\src\main\java\com\ruoyi\common\utils
目录下创建 mqtt
文件夹,并添加以下三个类:
package com.ruoyi.common.utils.mqtt;
import com.ruoyi.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {
@Autowired
private MqttPushClient mqttPushClient;
// 一系列配置属性省略...
@Bean
public MqttPushClient getMqttPushClient() {
if(enabled) {
String[] mqttTopics = StringUtils.split(defaultTopic, ",");
mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
for (String topic : mqttTopics) {
mqttPushClient.subscribe(topic, 0);
}
}
return mqttPushClient;
}
}
package com.ruoyi.common.utils.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class MqttPushClient {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
private static MqttClient client;
public void connect(String host, String clientId, String username, String password, int timeout, int keepalive) {
try {
client = new MqttClient(host, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
client.setCallback(new PushCallback());
client.connect(options);
} catch (Exception e) {
logger.error("MQTT连接失败", e);
}
}
public void publish(int qos, boolean retained, String topic, String message) {
try {
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
client.getTopic(topic).publish(mqttMessage).waitForCompletion();
} catch (Exception e) {
logger.error("消息发布失败", e);
}
}
public void subscribe(String topic, int qos) {
try {
client.subscribe(topic, qos);
} catch (MqttException e) {
logger.error("订阅失败", e);
}
}
}
package com.ruoyi.common.utils.mqtt;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class PushCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);
@Override
public void connectionLost(Throwable cause) {
logger.info("连接丢失,正在尝试重新连接...");
// 断线重连逻辑
}
@Override
public void messageArrived(String topic, MqttMessage message) {
logger.info("主题:{},消息:{}", topic, new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("消息发送成功:{}", token.isComplete());
}
}
运行若依项目后,后端可以成功接收到通过MQTT X
工具发送的数据,显示如下:
powered by kaifamiao