开发喵星球

若依前后端分离版集成MQTT(267)

需求提出

在若依前后端分离项目中集成MQTT,实现设备之间的低带宽和高延迟通信,适用于物联网(IoT)领域。

相关介绍

MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,基于发布/订阅模式,适用于受限环境的设备,如传感器、嵌入式设备等。它的特点包括轻量级、简单易用、低带宽、高延迟、可靠性和灵活性,被广泛应用于物联网、传感器网络、远程监控、消息通知等场景。

解决思路

  1. 下载并安装MQTT X测试工具。
  2. 配置MQTT X进行连接。
  3. 在项目中添加MQTT依赖。
  4. 在application.yml中添加MQTT相关配置。
  5. 实现MQTT的配置和客户端类。
  6. 运行项目,验证集成效果。

所需技术

项目结构

ruoyi
├── ruoyi-admin
│   └── src
│       └── main
│           └── resources
│               └── application.yml
├── ruoyi-common
│   └── pom.xml
│   └── src
│       └── main
│           └── java
│               └── com
│                   └── ruoyi
│                       └── common
│                           └── utils
│                               └── mqtt
│                                   ├── MqttConfig.java
│                                   ├── MqttPushClient.java
│                                   └── PushCallback.java

注意事项

完整代码

第一步:下载MQTT X测试工具

下载并安装MQTT X测试工具,

官网地址:https://www.emqx.com/zh/try?product=emqx-ecp

第二步:配置MQTT X进行连接

打开MQTT X,新建连接,输入名称,点击连接。

第三步:在ruoyi-common模块下的pom.xml加入依赖

<!--mqtt依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

第四步:在application.yml添加相关配置

application.yml中添加MQTT相关配置:

spring:
  mqtt:
    username: admin # 用户名
    password: admin # 密码
    hostUrl: tcp://broker.emqx.io:1883 # tcp://ip:端口
    clientId: clientId # 客户端id
    defaultTopic: topic,topic1 # 订阅主题
    timeout: 100 # 超时时间 (单位:秒)
    keepalive: 60 # 心跳 (单位:秒)
    enabled: true # 是否使用mqtt功能

第五步:在ruoyi-common\src\main\java\com\ruoyi\common\utils\mqtt目录下新建三个文件

MqttConfig.java

package com.ruoyi.common.utils.mqtt;

import com.ruoyi.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {
    @Autowired
    private MqttPushClient mqttPushClient;

    private String username;
    private String password;
    private String hostUrl;
    private String clientId;
    private String defaultTopic;
    private int timeout;
    private int keepalive;
    private boolean enabled;

    // Getters and Setters

    @Bean
    public MqttPushClient getMqttPushClient() {
        if(enabled) {
            String[] mqtt_topic = StringUtils.split(defaultTopic, ",");
            mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
            for(String topic : mqtt_topic) {
                mqttPushClient.subscribe(topic, 0);
            }
        }
        return mqttPushClient;
    }
}

MqttPushClient.java

package com.ruoyi.common.utils.mqtt;

import com.ruoyi.common.core.domain.AjaxResult;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MqttPushClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private PushCallback pushCallback;

    private static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }

    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
        try {
            client = new MqttClient(host, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            setClient(client);
            client.setCallback(pushCallback);
            client.connect(options);
        } catch (Exception e) {
            logger.error("MQTT connection failed", e);
        }
    }

    public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {
        try {
            MqttMessage message = new MqttMessage();
            message.setQos(qos);
            message.setRetained(retained);
            message.setPayload(pushMessage.getBytes());
            MqttTopic mTopic = getClient().getTopic(topic);
            if (mTopic == null) {
                logger.error("Topic not exist");
                return AjaxResult.error("Topic not exist");
            }
            MqttDeliveryToken token = mTopic.publish(message);
            token.waitForCompletion();
            return AjaxResult.success();
        } catch (MqttException e) {
            logger.error("MQTT publish failed", e);
            return AjaxResult.error();
        }
    }

    public void subscribe(String topic, int qos) {
        try {
            getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            logger.error("MQTT subscription failed", e);
        }
    }
}

PushCallback.java

package com.ruoyi.common.utils.mqtt;

import com.alibaba.fastjson2.JSONObject;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class PushCallback implements MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);

    @Autowired
    private MqttConfig mqttConfig;

    private static MqttClient client;
    private static String _topic;
    private static String _qos;
    private static String _msg;

    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("连接断开,可以做重连");
        if (client == null || !client.isConnected()) {
            mqttConfig.getMqttPushClient();
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        logger.info("接收消息主题 : " + topic);
        logger.info("接收消息Qos : " + mqttMessage.getQos());
        logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));

        _topic = topic;
        _qos = String.valueOf(mqttMessage.getQos());
        _msg = new String(mqttMessage.getPayload());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    public String receive() {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("topic", _topic);
        jsonObject.put("qos", _qos);
        jsonObject.put("msg", _msg);
        return jsonObject.toString();
    }
}

第六步:运行项目,后端可以接收MQTT X发来的数据

运行若依项目,后端可以接收MQTT X发来的数据。


运行结果

通过上述步骤,成功将 MQTT 集成到若依前后端分离项目,实现了设备之间的低带宽和高延迟通信。

   
分类:Java/OOP 作者:无限繁荣, 吴蓉 发表于:2024-07-12 15:35:56 阅读量:139
<<   >>


powered by kaifamiao