开发喵星球

若依前后端分离版集成 MQTT 协议(354)

MQTT(消息队列遥测传输协议)是一种轻量级的消息通信协议,主要用于在设备之间进行低带宽和高延迟的传输。MQTT基于发布/订阅(Publish/Subscribe)的模式,由 IBM 开发,现已成为物联网(IoT)领域的标准协议。

MQTT 的主要特点如下:

  1. 轻量级:协议设计简单,适用于资源有限的设备(如传感器、嵌入式系统),传输数据开销极小。
  2. 简单的发布/订阅模式:实现了发布者与订阅者之间的解耦,通信过程简洁明了。
  3. 适应低带宽与高延迟环境:在网络状况不佳时依然可以稳定通信,专为此类场景设计。
  4. 高可靠性:支持消息确认和持久化机制,通过QoS(服务质量)等级保证消息传输的可靠性。
  5. 灵活安全:支持不同类型的消息格式和内容(文本、二进制等),并支持SSL/TLS加密确保数据安全。
  6. 广泛应用场景:由于其轻量和灵活性,MQTT广泛用于物联网、远程监控、消息推送等应用场景,成为设备间重要的通信手段。

以下是若依前后端分离版项目集成MQTT协议的详细步骤:

1. 下载并安装MQTT X工具

首先,前往 EMQX 官网 下载 MQTT X,并完成安装。

安装完成

2. 建立连接

打开MQTT X,创建一个新的连接,设置连接名称,然后点击“连接”按钮。

创建连接

3. 在ruoyi-common模块的pom.xml中添加依赖

<!-- MQTT 相关依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

4. 在application.yml文件中添加配置

Spring配置中添加以下MQTT相关配置:

# MQTT配置
mqtt:
  username: admin # 用户名
  password: admin # 密码
  hostUrl: tcp://broker.emqx.io:1883 # 服务器地址
  clientId: clientId # 客户端ID
  defaultTopic: topic,topic1 # 订阅主题
  timeout: 100 # 超时时间
  keepalive: 60 # 心跳间隔
  enabled: true # 是否启用MQTT功能

配置示例

5. 新建mqtt相关工具类

ruoyi-common\src\main\java\com\ruoyi\common\utils 目录下创建 mqtt 文件夹,并添加以下三个类:

MqttConfig.java

package com.ruoyi.common.utils.mqtt;

import com.ruoyi.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {
    @Autowired
    private MqttPushClient mqttPushClient;

    // 一系列配置属性省略...

    @Bean
    public MqttPushClient getMqttPushClient() {
        if(enabled) {
            String[] mqttTopics = StringUtils.split(defaultTopic, ",");
            mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
            for (String topic : mqttTopics) {
                mqttPushClient.subscribe(topic, 0);
            }
        }
        return mqttPushClient;
    }
}

MqttPushClient.java

package com.ruoyi.common.utils.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class MqttPushClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
    private static MqttClient client;

    public void connect(String host, String clientId, String username, String password, int timeout, int keepalive) {
        try {
            client = new MqttClient(host, clientId, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            client.setCallback(new PushCallback());
            client.connect(options);
        } catch (Exception e) {
            logger.error("MQTT连接失败", e);
        }
    }

    public void publish(int qos, boolean retained, String topic, String message) {
        try {
            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            mqttMessage.setQos(qos);
            mqttMessage.setRetained(retained);
            client.getTopic(topic).publish(mqttMessage).waitForCompletion();
        } catch (Exception e) {
            logger.error("消息发布失败", e);
        }
    }

    public void subscribe(String topic, int qos) {
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            logger.error("订阅失败", e);
        }
    }
}

PushCallback.java

package com.ruoyi.common.utils.mqtt;

import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class PushCallback implements MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);

    @Override
    public void connectionLost(Throwable cause) {
        logger.info("连接丢失,正在尝试重新连接...");
        // 断线重连逻辑
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        logger.info("主题:{},消息:{}", topic, new String(message.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        logger.info("消息发送成功:{}", token.isComplete());
    }
}

6. 测试效果

运行若依项目后,后端可以成功接收到通过MQTT X工具发送的数据,显示如下:

成功接收

   
分类:Java/OOP 作者:无限繁荣, 吴蓉 发表于:2024-10-02 00:20:51 阅读量:127
<<   >>


powered by kaifamiao