在若依前后端分离项目中集成MQTT
,实现设备之间的低带宽和高延迟通信,适用于物联网(IoT)领域。
MQTT
(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,基于发布/订阅模式,适用于受限环境的设备,如传感器、嵌入式设备等。它的特点包括轻量级、简单易用、低带宽、高延迟、可靠性和灵活性,被广泛应用于物联网、传感器网络、远程监控、消息通知等场景。
ruoyi
├── ruoyi-admin
│ └── src
│ └── main
│ └── resources
│ └── application.yml
├── ruoyi-common
│ └── pom.xml
│ └── src
│ └── main
│ └── java
│ └── com
│ └── ruoyi
│ └── common
│ └── utils
│ └── mqtt
│ ├── MqttConfig.java
│ ├── MqttPushClient.java
│ └── PushCallback.java
MQTT X
工具正确安装和配置。MQTT
相关配置正确,避免连接失败。下载并安装MQTT X
测试工具,
官网地址:https://www.emqx.com/zh/try?product=emqx-ecp
打开MQTT X
,新建连接,输入名称,点击连接。
ruoyi-common
模块下的pom.xml
加入依赖<!--mqtt依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
application.yml
添加相关配置在application.yml
中添加MQTT相关配置:
spring:
mqtt:
username: admin # 用户名
password: admin # 密码
hostUrl: tcp://broker.emqx.io:1883 # tcp://ip:端口
clientId: clientId # 客户端id
defaultTopic: topic,topic1 # 订阅主题
timeout: 100 # 超时时间 (单位:秒)
keepalive: 60 # 心跳 (单位:秒)
enabled: true # 是否使用mqtt功能
ruoyi-common\src\main\java\com\ruoyi\common\utils\mqtt
目录下新建三个文件package com.ruoyi.common.utils.mqtt;
import com.ruoyi.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {
@Autowired
private MqttPushClient mqttPushClient;
private String username;
private String password;
private String hostUrl;
private String clientId;
private String defaultTopic;
private int timeout;
private int keepalive;
private boolean enabled;
// Getters and Setters
@Bean
public MqttPushClient getMqttPushClient() {
if(enabled) {
String[] mqtt_topic = StringUtils.split(defaultTopic, ",");
mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
for(String topic : mqtt_topic) {
mqttPushClient.subscribe(topic, 0);
}
}
return mqttPushClient;
}
}
package com.ruoyi.common.utils.mqtt;
import com.ruoyi.common.core.domain.AjaxResult;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqttPushClient {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
private static MqttClient client;
private static MqttClient getClient() {
return client;
}
private static void setClient(MqttClient client) {
MqttPushClient.client = client;
}
public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
try {
client = new MqttClient(host, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
setClient(client);
client.setCallback(pushCallback);
client.connect(options);
} catch (Exception e) {
logger.error("MQTT connection failed", e);
}
}
public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {
try {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = getClient().getTopic(topic);
if (mTopic == null) {
logger.error("Topic not exist");
return AjaxResult.error("Topic not exist");
}
MqttDeliveryToken token = mTopic.publish(message);
token.waitForCompletion();
return AjaxResult.success();
} catch (MqttException e) {
logger.error("MQTT publish failed", e);
return AjaxResult.error();
}
}
public void subscribe(String topic, int qos) {
try {
getClient().subscribe(topic, qos);
} catch (MqttException e) {
logger.error("MQTT subscription failed", e);
}
}
}
package com.ruoyi.common.utils.mqtt;
import com.alibaba.fastjson2.JSONObject;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class PushCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);
@Autowired
private MqttConfig mqttConfig;
private static MqttClient client;
private static String _topic;
private static String _qos;
private static String _msg;
@Override
public void connectionLost(Throwable throwable) {
logger.info("连接断开,可以做重连");
if (client == null || !client.isConnected()) {
mqttConfig.getMqttPushClient();
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
logger.info("接收消息主题 : " + topic);
logger.info("接收消息Qos : " + mqttMessage.getQos());
logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
_topic = topic;
_qos = String.valueOf(mqttMessage.getQos());
_msg = new String(mqttMessage.getPayload());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
public String receive() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", _topic);
jsonObject.put("qos", _qos);
jsonObject.put("msg", _msg);
return jsonObject.toString();
}
}
运行若依项目,后端可以接收MQTT X发来的数据。
通过上述步骤,成功将 MQTT
集成到若依前后端分离项目,实现了设备之间的低带宽和高延迟通信。
powered by kaifamiao