Springboot实现MQTT通信

article/2025/9/11 23:25:37

目录

      • 一、MQTT简介
          • 1、MQTT协议
          • 2、MQTT协议特点
      • 二、MQTT服务器搭建
      • 三、使用Springboot整合MQTT协议
        • 1、在父工程下创建一个Springboot项目作为消息的提供者
          • 1.1 导入依赖包
          • 1.2 修改配置文件
          • 1.3 消息发布者客户端配置
          • 1.4 消息发布客户端回调
          • 1.5 创建控制器测试发布信息
        • 2、在父工程下创建一个Springboot项目作为消息的接受者
          • 2.1 导入依赖包
          • 2.2 修改配置文件
          • 2.3 接收者客户端配置
          • 2.4 消息接收者客户端回调
          • 2.5 控制器控制手动建立和断开连接方法
        • 3、测试
          • 3.1 分别启动两个项目,可以在管理界面看到创建的两个客户端
          • 3.2 调用发布消息接口
          • 3.3 测试断开连接接口
          • 3.4 测试建立连接接口

一、MQTT简介

1、MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

2、MQTT协议特点
  • MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。

  • MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。

发布和订阅
在这里插入图片描述
QoS(Quality of Service levels)

  • ① QoS 0 至多一次
    这一级别会发生消息丢失或重复,消息发布依赖于底层TCP/IP网络。即:<=1
    在这里插入图片描述

  • ② QoS 1 最少一次
    QoS 1 承诺消息将至少传送一次给订阅者。
    在这里插入图片描述

  • ③ QoS 2 只有一次
    使用 QoS 2,我们保证消息仅传送到目的地一次。为此,带有唯一消息 ID 的消息会存储两次,首先来自发送者,然后是接收者。QoS 级别 2 在网络中具有最高的开销,因为在发送方和接收方之间需要两个流。

在这里插入图片描述

二、MQTT服务器搭建

这里可以参考我的另一篇博客: MQTT服务器的搭建与MQTT客户端的使用.

三、使用Springboot整合MQTT协议

1、在父工程下创建一个Springboot项目作为消息的提供者

1.1 导入依赖包
<dependencies><!--mqtt相关依赖--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
1.2 修改配置文件
spring:application:name: provider#MQTT配置信息mqtt:#MQTT服务地址,端口号默认11883,如果有多个,用逗号隔开url: tcp://127.0.0.1:11883#用户名username: admin#密码password: 123456#客户端id(不能重复)client:id: provider-id#MQTT默认的消息推送主题,实际可在调用接口是指定default: topic: topic
server:port: 8080
1.3 消息发布者客户端配置
package com.lyp.mqttprovider.mqtt;import lombok.extern.slf4j.Slf4j;import javax.annotation.PostConstruct;import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class MqttProviderConfig {@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client.id}")private String clientId;@Value("${spring.mqtt.default.topic}")private String defaultTopic;/*** 客户端对象*/private MqttClient client;/*** 在bean初始化后连接到服务器*/@PostConstructpublic void init(){connect();}/*** 客户端连接服务端*/public void connect(){try{//创建MQTT客户端对象client = new MqttClient(hostUrl,clientId,new MemoryPersistence());//连接设置MqttConnectOptions options = new MqttConnectOptions();//是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接服务器都是以新的身份options.setCleanSession(true);//设置连接用户名options.setUserName(username);//设置连接密码options.setPassword(password.toCharArray());//设置超时时间,单位为秒options.setConnectionTimeout(100);//设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);//设置回调client.setCallback(new MqttProviderCallBack());client.connect(options);} catch(MqttException e){e.printStackTrace();}}public void publish(int qos,boolean retained,String topic,String message){MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setRetained(retained);mqttMessage.setPayload(message.getBytes());//主题的目的地,用于发布/订阅信息MqttTopic mqttTopic = client.getTopic(topic);//提供一种机制来跟踪消息的传递进度//用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度MqttDeliveryToken token;try {//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态//一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。token = mqttTopic.publish(mqttMessage);token.waitForCompletion();} catch (MqttException e) {e.printStackTrace();}}}
1.4 消息发布客户端回调
package com.lyp.mqttprovider.mqtt;import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttProviderCallBack implements MqttCallback{@Value("${spring.mqtt.client.id}")private String clientId;/*** 与服务器断开的回调*/@Overridepublic void connectionLost(Throwable cause) {System.out.println(clientId+"与服务器断开连接");}/*** 消息到达的回调*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {}/*** 消息发布成功的回调*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {IMqttAsyncClient client = token.getClient();System.out.println(client.getClientId()+"发布消息成功!");}}
1.5 创建控制器测试发布信息
package com.lyp.mqttprovider.controller;import com.lyp.mqttprovider.mqtt.MqttProviderConfig;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;@Controller
public class SendController {@Autowiredprivate MqttProviderConfig providerClient;@RequestMapping("/sendMessage")@ResponseBodypublic String sendMessage(int qos,boolean retained,String topic,String message){try {providerClient.publish(qos, retained, topic, message);return "发送成功";} catch (Exception e) {e.printStackTrace();return "发送失败";}}
}

2、在父工程下创建一个Springboot项目作为消息的接受者

2.1 导入依赖包
<dependencies><!--mqtt相关依赖--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
2.2 修改配置文件
spring:application:name: consumer#MQTT配置信息mqtt:#MQTT服务端地址,端口默认为11883,如果有多个,用逗号隔开url: tcp://127.0.0.1:11883#用户名username: admin#密码password: 123456#客户端id(不能重复)client:id: consumer-id#MQTT默认的消息推送主题,实际可在调用接口时指定default:topic: topic
server:port: 8085
2.3 接收者客户端配置
package com.lyp.mqttconsumer.mqtt;import javax.annotation.PostConstruct;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttConsumerConfig {@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client.id}")private String clientId;@Value("${spring.mqtt.default.topic}")private String defaultTopic;/*** 客户端对象*/private MqttClient client;/*** 在bean初始化后连接到服务器*/@PostConstructpublic void init(){connect();}/*** 客户端连接服务端*/public void connect(){try {//创建MQTT客户端对象client = new MqttClient(hostUrl,clientId,new MemoryPersistence());//连接设置MqttConnectOptions options = new MqttConnectOptions();//是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接到服务端都是以新的身份options.setCleanSession(true);//设置连接用户名options.setUserName(username);//设置连接密码options.setPassword(password.toCharArray());//设置超时时间,单位为秒options.setConnectionTimeout(100);//设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);//设置回调client.setCallback(new MqttConsumerCallBack());client.connect(options);//订阅主题//消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息int[] qos = {1,1};//主题String[] topics = {"topic1","topic2"};//订阅主题client.subscribe(topics,qos);} catch (MqttException e) {e.printStackTrace();}}/*** 断开连接*/public void disConnect(){try {client.disconnect();} catch (MqttException e) {e.printStackTrace();}}/*** 订阅主题*/public void subscribe(String topic,int qos){try {client.subscribe(topic,qos);} catch (MqttException e) {e.printStackTrace();}}
}
2.4 消息接收者客户端回调
package com.lyp.mqttconsumer.mqtt;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MqttConsumerCallBack implements MqttCallback{/*** 客户端断开连接的回调*/@Overridepublic void connectionLost(Throwable throwable) {System.out.println("与服务器断开连接,可重连");}/*** 消息到达的回调*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println(String.format("接收消息主题 : %s",topic));System.out.println(String.format("接收消息Qos : %d",message.getQos()));System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));System.out.println(String.format("接收消息retained : %b",message.isRetained()));}/*** 消息发布成功的回调*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}}
2.5 控制器控制手动建立和断开连接方法
package com.lyp.mqttconsumer.controller;import com.lyp.mqttconsumer.mqtt.MqttConsumerConfig;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;@Controller
public class TestController {@Autowiredprivate MqttConsumerConfig client;@Value("${spring.mqtt.client.id}")private String clientId;@RequestMapping("/connect")@ResponseBodypublic String connect(){client.connect();return clientId + "连接到服务器";}@RequestMapping("/disConnect")@ResponseBodypublic String disConnect(){client.disConnect();return clientId + "与服务器断开连接";}
}

3、测试

3.1 分别启动两个项目,可以在管理界面看到创建的两个客户端

在这里插入图片描述

3.2 调用发布消息接口

在这里插入图片描述
数据一致,测试成功
在这里插入图片描述

3.3 测试断开连接接口

在这里插入图片描述
查看客户端,确实已经断开连接
在这里插入图片描述

3.4 测试建立连接接口

在这里插入图片描述
客户端显示已经重新连接上,测试成功
在这里插入图片描述


http://chatgpt.dhexx.cn/article/xdQ3yl4B.shtml

相关文章

一步一步来:MQTT服务器搭建、MQTT客户端使用

物联网应用如火如荼&#xff0c;本文就物联网应用中最受青睐的协议 MQTT相关测试工具的使用进行简单说明。 希望此文能给需要用到的朋友一些微薄的帮助…… 一、MQTT服务器&#xff08; emqx &#xff09;搭建 1. 下载服务器MQTT Broker 从 https://www.emqx.io/cn/mqtt/public…

【MQTT基础篇(三)】连接MQTT服务端

文章目录 连接MQTT服务端1 CONNECT – 连接服务端1.1 clientId – 客户端ID1.2 cleanSession – 清除会话1.3 keepAlive – 心跳时间间隔 2 CONNACK – 确认连接请求2.1 returnCode – 连接返回码2.2 sessionPresent – 当前会话 连接MQTT服务端 MQTT客户端之间要想实现通讯&am…

MQTT介绍与使用

目录 一、MQTT简介 二、特性 三、实现方式   四、MQTT的搭建&#xff08;ubuntu&#xff09; 五、MQTT权限配置 六、MQTT实现&#xff08;Java语言&#xff09; 正文 物联网是新一代信息技术的重要组成部分&#xff0c;也是“信息化”时代的重要发展阶段。其英文名称是&am…

什么是MQTT

1、MQTT来龙去脉 1.1 什么是MQTT MQTT(英文全称Message Queuing Telemetry Transport&#xff0c;消息队列遥测传输协议)是一种基于发布/订阅&#xff08;PUBLISH/SUBSCRIBE&#xff09;模式的轻量级的物联网通信协议。从这个定义中我们可以总结出四个关键词&#xff1a;消息队…

设备分配与spooling技术详解

5.4 设备分配&#xff08;重点&#xff01;&#xff01;&#xff01;&#xff09; 一、设备分配中的数据结构二、设备分配时应考虑的因素三、设备独立性四、设备独占的分配程序五、Spooling技术 一、设备分配中的数据结构 在进行设备分配时&#xff0c;通常都要借助一些表格…

SPOOLing和虚拟化

什么是虚拟化 虚拟化就是无中生有&#xff0c;就是暗度陈仓&#xff08;狗头保命&#xff09;。仔细观察整个计算机系统的设计&#xff0c;到处都体现着虚拟化的技术。当然虚拟化就是操作系统设计的要求之一。我们很好理解 CPU 的虚拟化技术&#xff0c;也就是通过进程调度实现…

操作系统 假脱机(Spooling)系统

介绍 通过多道程序技术可将一台物理CPU虚拟为多台逻辑CPU&#xff0c;从而允许多个用户共享一台主机。那么&#xff0c;假脱机技术&#xff0c;则可将一台物理I/O设备虚拟为多台逻辑I/O设备&#xff0c;这样也就允许多个用户共享一台物理I/O设备。 1. 假脱机技术 早期&#…

题目SPOOLing系统的设计与实现

最近刚刚做的一个课程设计&#xff0c;关于SPOOLing的。 一、算法或原理的实现思想 技术原理 SPOOLing技术可将一台物理I/O设备虚拟为多台逻辑I/O设备&#xff0c;同样允许多个用户共享一台物理I/O设备。SPOOLing技术把所有用户进程的输出都送入输出井&#xff0c;然后再由输出…

精确度,准确度,精密度关系

1.精确度&#xff0c;准确度&#xff0c;精密度的关系 三者得关系大体可以理解为&#xff0c;准确度精密度 精确度&#xff0c;准确度反应距离真值得偏差&#xff0c;精密度反应测量得稳定性&#xff0c;精确度反应二者之综合。 三者得主次关系&#xff1a;精密度>准确度 …

Mysql的浮点精确度

1.mysql的用于记录小数的类型有三个float ,decimal 和double他们之间的关系 先创建一个表test都用了float ,decimal 和double 插入一条数据查看发现没有发现精度丢失问题 再插入一条数据&#xff0c;发现精度损失&#xff1a; 查看三个类型的范围&#xff1a; 插入小数的位数多…

验证集精确度和损失同时上升

目录 1. 实验结果2. 分析 1. 实验结果 下图中val_acc&#xff0c;val_loss分别表示验证集精确度和损失&#xff1b;train_acc&#xff0c;train_loss分别表示训练集精确度和损失。验证集精确度一直上升&#xff0c;但是损失在第六个epoch后也开始上升&#xff0c;如何解释&…

JavaScript超大或超小数值精确度丢失解决方案

情景一 接口字段&#xff0c;Number类型数据失真&#xff0c;解决方法可直接让服务端把字段类型改成String类型即可。 情景二 某些特殊场景&#xff0c;需要保留小数点后9位(及其以上)&#xff0c;直接调用Number对象自带的toFixed()函数&#xff0c;会出现小数点后数据失真…

关于JavaScript精确度问题

一、js精确度的安全范围是 -2^53 至 2^53 一旦超过这个范围则无法精确表示 1.解决方法 使用第三方包 JSON-Bigint JSONbig.parse() //转换出来的是一个BigNubmer对象 若要使用则用toString()方法 JSONbig.stringify() 2.当axios获取响应数据时自动会将数据JSON.parse()解析为…

【机器学习】准确率、精确度、召回率和 F1 定义

一、说明 数据科学家选择目标变量后 - 例如他们希望预测电子表格中的“列”&#xff0c;并完成了转换数据和构建模型的先决条件&#xff0c;最后步骤之一是评估模型的性能。 二、混淆矩阵的模型 2.1 混淆矩阵 选择性能指标通常取决于要解决的业务问题。假设您的数据集中有 10…

Python计算分类问题的评价指标(准确率、精确度、召回率和F1值,Kappa指标)

机器学习的分类问题常用评论指标有&#xff1a;准确率、精确度、召回率和F1值&#xff0c;还有kappa指标 。 每次调包去找他们的计算代码很麻烦&#xff0c;所以这里一次性定义一个函数&#xff0c;直接计算所有的评价指标。 每次输入预测值和真实值就可以得到上面的指标值&a…

batch_size对精确度和损失的影响

1 问题 在深度学习的学习过程中&#xff0c;模型性能对batchsize虽然没有学习率那么敏感&#xff0c;但是在进一步提升模型性能时&#xff0c;batch_size就会成为一个非常关键的参数。 batch_size对精度和损失的影响研究。 batch_size [,32,64,128&#xff0c;256] 不同batch_…

准度、精度傻傻分不清?

[导读] 做电子产品&#xff0c;常常遇到测量。此时就难免会关注到精度、准度等概念&#xff0c;遇到不少朋友对这两个概念不清楚&#xff0c;今天就来分享一下这两个概念。最近很忙&#xff0c;更的不及时&#xff0c;实在抱歉。也感谢大家不离不弃&#xff01;对于更文分享这件…

机器学习笔记--classification_report精确度/召回率/F1值

classification_report简介 sklearn中的classification_report函数用于显示主要分类指标的文本报告&#xff0e;在报告中显示每个类的精确度&#xff0c;召回率&#xff0c;F1值等信息。 主要参数: y_true&#xff1a;1维数组&#xff0c;或标签指示器数组/稀疏矩阵&#xf…

YOLOv5~目标检测模型精确度

还是yolo5的基础啊~~ 一些关于目标检测模型的评估指标&#xff1a;IOU、TP&FP&FN&TN、mAP等&#xff0c;并列举了目标检测中的mAP计算。 指标评估(重要的一些定义) IOU 也称重叠度表示计算预测回归框和真实回归框的交并比,计算公式如下: TP&FP&FN&…

睿智的目标检测20——利用mAP计算目标检测精确度

睿智的目标检测20——利用mAP计算目标检测精确度 学习前言GITHUB代码下载知识储备1、IOU的概念2、TP TN FP FN的概念3、precision&#xff08;精确度&#xff09;和recall&#xff08;召回率&#xff09;4、概念举例5、单个指标的局限性 什么是AP绘制mAP 学习前言 好多人都想算…