springboot 整合 mqtt

article/2025/9/11 23:25:38

springboot 整合 mqtt

最近由于iot越来越火, 物联网的需求越来越多, 那么理所当然的使用mqtt的场景也就越来越多,
接下来是我使用springboot整合mqtt的过程, 以及踩过的一些坑.

mqtt服务器使用的是 EMQX, 官网 : 这里

搭建的时候如果你使用的是集群 记得开放以下端口:

EMQX集群端口

好了, 搭建成功下一步就是我们的java程序要与mqtt连接, 这里有两种方式(其实不止两种)进行连接.
一是 直接使用 MQTT Java 客户端库,详情可以查看官方的例子: MQTT Java 客户端 我就跳过了

二是使用 spring integration mqtt也是比较推荐的一种,也是我们主讲这种.

第一步 添加 maven dependency

        <dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.14</version></dependency>

第二步 添加配置

1 先写好一些基本配置

mqtt:username: test                        # 账号password: 123456                      # 密码host-url: tcp://127.0.0.1:1883        # mqtt连接tcp地址in-client-id: ${random.value}         # 随机值,使出入站 client ID 不同out-client-id: ${random.value}client-id: ${random.int}                   # 客户端Id,不能相同,采用随机数 ${random.value}default-topic: test/#,topic/+/+/up         # 默认主题timeout: 60                                # 超时时间keepalive: 60                              # 保持连接clearSession: true                         # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
  1. 然后写一个对应的类MqttProperties
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** MqttProperties ** @author hengzi* @date 2022/8/23*/
@Component
public class MqttProperties {/*** 用户名*/@Value("${mqtt.username}")private String username;/*** 密码*/@Value("${mqtt.password}")private String password;/*** 连接地址*/@Value("${mqtt.host-url}")private String hostUrl;/*** 进-客户Id*/@Value("${mqtt.in-client-id}")private String inClientId;/*** 出-客户Id*/@Value("${mqtt.out-client-id}")private String outClientId;/*** 客户Id*/@Value("${mqtt.client-id}")private String clientId;/*** 默认连接话题*/@Value("${mqtt.default-topic}")private String defaultTopic;/*** 超时时间*/@Value("${mqtt.timeout}")private int timeout;/*** 保持连接数*/@Value("${mqtt.keepalive}")private int keepalive;/**是否清除session*/@Value("${mqtt.clearSession}")private boolean clearSession;// ...getter and setter}

接下来就是配置一些乱七八糟的东西, 这里有很多概念性的东西 比如 管道channel, 适配器 adapter, 入站Inbound, 出站Outbound,等等等等, 看起来是非常头痛的

好吧,那就一个一个来,

首先连接mqtt需要一个客户端, 那么我们就开一个客户端工厂, 这里可以产生很多很多的客户端

    @Beanpublic MqttPahoClientFactory mqttPahoClientFactory(){DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(mqttProperties.getHostUrl().split(","));options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());factory.setConnectionOptions(options);return factory;}

然后再搞两根管子(channel),一个出站,一个入站

    //出站消息管道,@Beanpublic MessageChannel mqttOutboundChannel(){return new DirectChannel();}// 入站消息管道@Beanpublic MessageChannel mqttInboundChannel(){return new DirectChannel();}

为了使这些管子能流通 就需要一个适配器(adapter)

// Mqtt 管道适配器@Beanpublic MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));}

然后定义消息生产者

    // 消息生产者@Beanpublic MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());//入站投递的通道adapter.setOutputChannel(mqttInboundChannel());adapter.setQos(1);return adapter;}

那我们收到消息去哪里处理(消费)呢,答案是这里:

@Bean//使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行@ServiceActivator(inputChannel = "mqttInboundChannel")public MessageHandler handleMessage() {// 这个 mqttMessageHandle 其实就是一个 MessageHandler 的实现类(这个类我放下面)return mqttMessageHandle;// 你也可以这样写
//        return new MessageHandler() {
//            @Override
//            public void handleMessage(Message<?> message) throws MessagingException {
//                // do something
//            }
//        };}

到这里我们其实已经可以接受到来自mqtt的消息了

接下来配置向mqtt发送消息

配置 出站处理器

    // 出站处理器@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound(MqttPahoClientFactory factory){MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);handler.setAsync(true);handler.setConverter(new DefaultPahoMessageConverter());handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);return handler;}

这个 出站处理器 在我看来就是让别人 (MqttPahoMessageHandler)处理了, 我就不处理了,我只管我要发送什么,至于怎么发送,由MqttPahoMessageHandler来完成

接下来我们定义一个接口即可

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** MqttGateway** @author hengzi* @date 2022/8/23*/@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data);
}

我们直接调用这个接口就可以向mqtt 发送数据


到目前为止,整个配置文件长这样:


import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;/*** MqttConfig** @author hengzi* @date 2022/8/23*/
@Configuration
public class MqttConfig {/***  以下属性将在配置文件中读取**/@Autowiredprivate MqttProperties mqttProperties;//Mqtt 客户端工厂@Beanpublic MqttPahoClientFactory mqttPahoClientFactory(){DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(mqttProperties.getHostUrl().split(","));options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());factory.setConnectionOptions(options);return factory;}// Mqtt 管道适配器@Beanpublic MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));}// 消息生产者@Beanpublic MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());//入站投递的通道adapter.setOutputChannel(mqttInboundChannel());adapter.setQos(1);return adapter;}// 出站处理器@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound(MqttPahoClientFactory factory){MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);handler.setAsync(true);handler.setConverter(new DefaultPahoMessageConverter());handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);return handler;}@Bean//使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行@ServiceActivator(inputChannel = "mqttInboundChannel")public MessageHandler handleMessage() {return mqttMessageHandle;}//出站消息管道,@Beanpublic MessageChannel mqttOutboundChannel(){return new DirectChannel();}// 入站消息管道@Beanpublic MessageChannel mqttInboundChannel(){return new DirectChannel();}}

处理消息的 MqttMessageHandle

@Component
public class MqttMessageHandle implements MessageHandler {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {}
}

在进一步了解之后,发现可以优化的地方,比如channel 的类型是有很多种的, 这里使用的DirectChannel,是Spring Integration默认的消息通道,它将消息发送给为一个订阅者,然后阻碍发送直到消息被接收,传输方式都是同步的方式,都是由一个线程来运行的.

这里我们可以将入站channel改成 ExecutorChannel一个可以使用多线程的channel

@Beanpublic ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 最大可创建的线程数int maxPoolSize = 200;executor.setMaxPoolSize(maxPoolSize);// 核心线程池大小int corePoolSize = 50;executor.setCorePoolSize(corePoolSize);// 队列最大长度int queueCapacity = 1000;executor.setQueueCapacity(queueCapacity);// 线程池维护线程所允许的空闲时间int keepAliveSeconds = 300;executor.setKeepAliveSeconds(keepAliveSeconds);// 线程池对拒绝任务(无线程可用)的处理策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}// 入站消息管道@Beanpublic MessageChannel mqttInboundChannel(){// 用线程池return new ExecutorChannel(mqttThreadPoolTaskExecutor());}

到这里其实可以运行了.

但是这样配置其实还是有点多, 有点乱, 于是我查找官网, f发现一种更简单的配置方法 叫 Java DSL, 官网连接: Configuring with the Java DSL

我们参考官网,稍微改一下,使用 DSL的方式进行配置:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;/*** MqttConfigV2** @author hengzi* @date 2022/8/24*/
@Configuration
public class MqttConfigV2 {@Autowiredprivate MqttProperties mqttProperties;@Autowiredprivate MqttMessageHandle mqttMessageHandle;//Mqtt 客户端工厂 所有客户端从这里产生@Beanpublic MqttPahoClientFactory mqttPahoClientFactory(){DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(mqttProperties.getHostUrl().split(","));options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());factory.setConnectionOptions(options);return factory;}// Mqtt 管道适配器@Beanpublic MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));}// 消息消费者 (接收,处理来自mqtt的消息)@Beanpublic IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {adapter.setCompletionTimeout(5000);adapter.setQos(1);return IntegrationFlows.from( adapter).channel(new ExecutorChannel(mqttThreadPoolTaskExecutor())).handle(mqttMessageHandle).get();}@Beanpublic ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 最大可创建的线程数int maxPoolSize = 200;executor.setMaxPoolSize(maxPoolSize);// 核心线程池大小int corePoolSize = 50;executor.setCorePoolSize(corePoolSize);// 队列最大长度int queueCapacity = 1000;executor.setQueueCapacity(queueCapacity);// 线程池维护线程所允许的空闲时间int keepAliveSeconds = 300;executor.setKeepAliveSeconds(keepAliveSeconds);// 线程池对拒绝任务(无线程可用)的处理策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}// 出站处理器 (向 mqtt 发送消息 生产者)@Beanpublic IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);handler.setAsync(true);handler.setConverter(new DefaultPahoMessageConverter());handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get();}}

这样看起来真的简单多了, 头也没那么大了, 我要是早知道多好.

好了以上就是配置相关的, 到这里其实是已经完成springboot 与 mqtt 的整合了.


但其实我一直有个想法, 就是我们接收的消息 都是在 handleMessage这个方法里面执行的,

	@Overridepublic void handleMessage(Message<?> message) throws MessagingException {}

所以我就有了一个想法, 能不能根据 我订阅的主题,在不同的方法执行, 对于这个问题,其实你用if ... else ...也能实现, 但很明显,如果我订阅的主题很多的话, 那写起来就很头痛了.

对于这个问题,有两种思路, 一个是添加Spring Integration的路由 router,根据不同topic路由到不同的channel, 这个我也知道能不能实现, 我这里就不讨论了.

第二种是, 我也不知道名字改如何叫, 我是参考了 spring@Controller的设计, 暂且叫他注解模式.

众所周知,我们的接口都是在类上加 @Controller这个注解, 就代表这个类是 http 接口, 再在方法加上 @RequestMapping就能实现不同的 url 调用不同的方法.

参数这个设计 我们在类上面加 @MqttService就代表这个类是专门处理mqtt消息的服务类
同时 在这个类的方法上 加上 @MqttTopic就代表 这个主题由这个方法处理.

OK, 理论有了,接下来就是 实践.

先定义 两个注解

import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;import java.lang.annotation.*;@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {@AliasFor(annotation = Component.class)String value() default "";
}

加上 @Component注解 spring就会扫描, 并注册到IOC容器里


import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {/*** 主题名字*/String value() default "";}

参考 @RequestMapping我们使用起来应该是这样的:


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;/*** MqttTopicHandle** @author hengzi* @date 2022/8/24*/
@MqttService
public class MqttTopicHandle {public static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class);// 这里的 # 号是通配符@MqttTopic("test/#")public void test(Message<?> message){log.info("test="+message.getPayload());}// 这里的 + 号是通配符@MqttTopic("topic/+/+/up")public void up(Message<?> message){log.info("up="+message.getPayload());}// 注意 你必须先订阅@MqttTopic("topic/1/2/down")public void down(Message<?> message){log.info("down="+message.getPayload());}
}

OK 接下来就是实现这样的使用

分析 :

当我们收到消息时, 我们从IOC容器中 找到所有 带 @MqttService注解的类

然后 遍历这些类, 找到带有 @MqttTopic的方法

接着 把 @MqttTopic注解的的值 与 接受到的topic 进行对比

如果一致则执行这个方法

废话少说, 上代码


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;/*** MessageHandleService** @author hengzi* @date 2022/8/24*/
@Component
public class MqttMessageHandle implements MessageHandler {public static final Logger log = LoggerFactory.getLogger(MqttMessageHandle.class);// 包含 @MqttService注解 的类(Component)public static Map<String, Object> mqttServices;/*** 所有mqtt到达的消息都会在这里处理* 要注意这个方法是在线程池里面运行的* @param message message*/@Overridepublic void handleMessage(Message<?> message) throws MessagingException {getMqttTopicService(message);}public Map<String, Object> getMqttServices(){if(mqttServices==null){mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);}return mqttServices;}public void getMqttTopicService(Message<?> message){// 在这里 我们根据不同的 主题 分发不同的消息String receivedTopic = message.getHeaders().get("mqtt_receivedTopic",String.class);if(receivedTopic==null || "".equals(receivedTopic)){return;}for(Map.Entry<String, Object> entry : getMqttServices().entrySet()){// 把所有带有 @MqttService 的类遍历Class<?> clazz = entry.getValue().getClass();// 获取他所有方法Method[] methods = clazz.getDeclaredMethods();for ( Method method: methods ){if (method.isAnnotationPresent(MqttTopic.class)){// 如果这个方法有 这个注解MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);if(isMatch(receivedTopic,handleTopic.value())){// 并且 这个 topic 匹配成功try {method.invoke(SpringUtils.getBean(clazz),message);return;} catch (IllegalAccessException e) {e.printStackTrace();log.error("代理炸了");} catch (InvocationTargetException e) {log.error("执行 {} 方法出现错误",handleTopic.value(),e);}}}}}}/*** mqtt 订阅的主题与我实际的主题是否匹配* @param topic 是实际的主题* @param pattern 是我订阅的主题 可以是通配符模式* @return 是否匹配*/public static boolean isMatch(String topic, String pattern){if((topic==null) || (pattern==null) ){return false;}if(topic.equals(pattern)){// 完全相等是肯定匹配的return true;}if("#".equals(pattern)){// # 号代表所有主题  肯定匹配的return true;}String[] splitTopic = topic.split("/");String[] splitPattern = pattern.split("/");boolean match = true;// 如果包含 # 则只需要判断 # 前面的for (int i = 0; i < splitPattern.length; i++) {if(!"#".equals(splitPattern[i])){// 不是# 号 正常判断if(i>=splitTopic.length){// 此时长度不相等 不匹配match = false;break;}if(!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])){// 不相等 且不等于 +match = false;break;}}else {// 是# 号  肯定匹配的break;}}return match;}}

工具类 SpringUtils


import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;import java.util.Map;/*** spring工具类 方便在非spring管理环境中获取bean* */
@Component
public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware 
{/** Spring应用上下文环境 */private static ConfigurableListableBeanFactory beanFactory;private static ApplicationContext applicationContext;public static Map<String, Object> getBeansByAnnotation(Class clsName) throws BeansException{return beanFactory.getBeansWithAnnotation(clsName);}@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {SpringUtils.beanFactory = beanFactory;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {SpringUtils.applicationContext = applicationContext;}/*** 获取对象** @param name* @return Object 一个以所给名字注册的bean的实例* @throws org.springframework.beans.BeansException**/@SuppressWarnings("unchecked")public static <T> T getBean(String name) throws BeansException{return (T) beanFactory.getBean(name);}/*** 获取类型为requiredType的对象** @param clz* @return* @throws org.springframework.beans.BeansException**/public static <T> T getBean(Class<T> clz) throws BeansException{T result = (T) beanFactory.getBean(clz);return result;}/*** 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true** @param name* @return boolean*/public static boolean containsBean(String name){return beanFactory.containsBean(name);}/*** 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)** @param name* @return boolean* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException**/public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException{return beanFactory.isSingleton(name);}/*** @param name* @return Class 注册对象的类型* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException**/public static Class<?> getType(String name) throws NoSuchBeanDefinitionException{return beanFactory.getType(name);}/*** 如果给定的bean名字在bean定义中有别名,则返回这些别名** @param name* @return* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException**/public static String[] getAliases(String name) throws NoSuchBeanDefinitionException{return beanFactory.getAliases(name);}/*** 获取aop代理对象* * @param invoker* @return*/@SuppressWarnings("unchecked")public static <T> T getAopProxy(T invoker){return (T) AopContext.currentProxy();}/*** 获取当前的环境配置,无配置返回null** @return 当前的环境配置*/public static String[] getActiveProfiles(){return applicationContext.getEnvironment().getActiveProfiles();}}

OK, 大功告成. 终于舒服了, 终于不用写if...else...了, 个人感觉这样处理起来会更加优雅. 写代码最重要是什么, 是优雅~

以上!

参考文章:
使用 Spring integration 在Springboot中集成Mqtt
Spring Integration(一)概述

附:
动态添加主题方式:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;import java.util.Arrays;/*** MqttService** @author hengzi* @date 2022/8/25*/
@Service
public class MqttService {@Autowiredprivate MqttPahoMessageDrivenChannelAdapter adapter;public void addTopic(String topic) {addTopic(topic, 1);}public void addTopic(String topic,int qos) {String[] topics = adapter.getTopic();if(!Arrays.asList(topics).contains(topic)){adapter.addTopic(topic,qos);}}public void removeTopic(String topic) {adapter.removeTopic(topic);}}

直接调用就行


http://chatgpt.dhexx.cn/article/3rW2CLpa.shtml

相关文章

Springboot实现MQTT通信

目录 一、MQTT简介1、MQTT协议2、MQTT协议特点 二、MQTT服务器搭建三、使用Springboot整合MQTT协议1、在父工程下创建一个Springboot项目作为消息的提供者1.1 导入依赖包1.2 修改配置文件1.3 消息发布者客户端配置1.4 消息发布客户端回调1.5 创建控制器测试发布信息 2、在父工程…

一步一步来:MQTT服务器搭建、MQTT客户端使用

物联网应用如火如荼&#xff0c;本文就物联网应用中最受青睐的协议 MQTT相关测试工具的使用进行简单说明。 希望此文能给需要用到的朋友一些微薄的帮助…… 一、MQTT服务器&#xff08; emqx &#xff09;搭建 1. 下载服务器MQTT Broker 从 https://www.emqx.io/cn/mqtt/public…

【MQTT基础篇(三)】连接MQTT服务端

文章目录 连接MQTT服务端1 CONNECT – 连接服务端1.1 clientId – 客户端ID1.2 cleanSession – 清除会话1.3 keepAlive – 心跳时间间隔 2 CONNACK – 确认连接请求2.1 returnCode – 连接返回码2.2 sessionPresent – 当前会话 连接MQTT服务端 MQTT客户端之间要想实现通讯&am…

MQTT介绍与使用

目录 一、MQTT简介 二、特性 三、实现方式   四、MQTT的搭建&#xff08;ubuntu&#xff09; 五、MQTT权限配置 六、MQTT实现&#xff08;Java语言&#xff09; 正文 物联网是新一代信息技术的重要组成部分&#xff0c;也是“信息化”时代的重要发展阶段。其英文名称是&am…

什么是MQTT

1、MQTT来龙去脉 1.1 什么是MQTT MQTT(英文全称Message Queuing Telemetry Transport&#xff0c;消息队列遥测传输协议)是一种基于发布/订阅&#xff08;PUBLISH/SUBSCRIBE&#xff09;模式的轻量级的物联网通信协议。从这个定义中我们可以总结出四个关键词&#xff1a;消息队…

设备分配与spooling技术详解

5.4 设备分配&#xff08;重点&#xff01;&#xff01;&#xff01;&#xff09; 一、设备分配中的数据结构二、设备分配时应考虑的因素三、设备独立性四、设备独占的分配程序五、Spooling技术 一、设备分配中的数据结构 在进行设备分配时&#xff0c;通常都要借助一些表格…

SPOOLing和虚拟化

什么是虚拟化 虚拟化就是无中生有&#xff0c;就是暗度陈仓&#xff08;狗头保命&#xff09;。仔细观察整个计算机系统的设计&#xff0c;到处都体现着虚拟化的技术。当然虚拟化就是操作系统设计的要求之一。我们很好理解 CPU 的虚拟化技术&#xff0c;也就是通过进程调度实现…

操作系统 假脱机(Spooling)系统

介绍 通过多道程序技术可将一台物理CPU虚拟为多台逻辑CPU&#xff0c;从而允许多个用户共享一台主机。那么&#xff0c;假脱机技术&#xff0c;则可将一台物理I/O设备虚拟为多台逻辑I/O设备&#xff0c;这样也就允许多个用户共享一台物理I/O设备。 1. 假脱机技术 早期&#…

题目SPOOLing系统的设计与实现

最近刚刚做的一个课程设计&#xff0c;关于SPOOLing的。 一、算法或原理的实现思想 技术原理 SPOOLing技术可将一台物理I/O设备虚拟为多台逻辑I/O设备&#xff0c;同样允许多个用户共享一台物理I/O设备。SPOOLing技术把所有用户进程的输出都送入输出井&#xff0c;然后再由输出…

精确度,准确度,精密度关系

1.精确度&#xff0c;准确度&#xff0c;精密度的关系 三者得关系大体可以理解为&#xff0c;准确度精密度 精确度&#xff0c;准确度反应距离真值得偏差&#xff0c;精密度反应测量得稳定性&#xff0c;精确度反应二者之综合。 三者得主次关系&#xff1a;精密度>准确度 …

Mysql的浮点精确度

1.mysql的用于记录小数的类型有三个float ,decimal 和double他们之间的关系 先创建一个表test都用了float ,decimal 和double 插入一条数据查看发现没有发现精度丢失问题 再插入一条数据&#xff0c;发现精度损失&#xff1a; 查看三个类型的范围&#xff1a; 插入小数的位数多…

验证集精确度和损失同时上升

目录 1. 实验结果2. 分析 1. 实验结果 下图中val_acc&#xff0c;val_loss分别表示验证集精确度和损失&#xff1b;train_acc&#xff0c;train_loss分别表示训练集精确度和损失。验证集精确度一直上升&#xff0c;但是损失在第六个epoch后也开始上升&#xff0c;如何解释&…

JavaScript超大或超小数值精确度丢失解决方案

情景一 接口字段&#xff0c;Number类型数据失真&#xff0c;解决方法可直接让服务端把字段类型改成String类型即可。 情景二 某些特殊场景&#xff0c;需要保留小数点后9位(及其以上)&#xff0c;直接调用Number对象自带的toFixed()函数&#xff0c;会出现小数点后数据失真…

关于JavaScript精确度问题

一、js精确度的安全范围是 -2^53 至 2^53 一旦超过这个范围则无法精确表示 1.解决方法 使用第三方包 JSON-Bigint JSONbig.parse() //转换出来的是一个BigNubmer对象 若要使用则用toString()方法 JSONbig.stringify() 2.当axios获取响应数据时自动会将数据JSON.parse()解析为…

【机器学习】准确率、精确度、召回率和 F1 定义

一、说明 数据科学家选择目标变量后 - 例如他们希望预测电子表格中的“列”&#xff0c;并完成了转换数据和构建模型的先决条件&#xff0c;最后步骤之一是评估模型的性能。 二、混淆矩阵的模型 2.1 混淆矩阵 选择性能指标通常取决于要解决的业务问题。假设您的数据集中有 10…

Python计算分类问题的评价指标(准确率、精确度、召回率和F1值,Kappa指标)

机器学习的分类问题常用评论指标有&#xff1a;准确率、精确度、召回率和F1值&#xff0c;还有kappa指标 。 每次调包去找他们的计算代码很麻烦&#xff0c;所以这里一次性定义一个函数&#xff0c;直接计算所有的评价指标。 每次输入预测值和真实值就可以得到上面的指标值&a…

batch_size对精确度和损失的影响

1 问题 在深度学习的学习过程中&#xff0c;模型性能对batchsize虽然没有学习率那么敏感&#xff0c;但是在进一步提升模型性能时&#xff0c;batch_size就会成为一个非常关键的参数。 batch_size对精度和损失的影响研究。 batch_size [,32,64,128&#xff0c;256] 不同batch_…

准度、精度傻傻分不清?

[导读] 做电子产品&#xff0c;常常遇到测量。此时就难免会关注到精度、准度等概念&#xff0c;遇到不少朋友对这两个概念不清楚&#xff0c;今天就来分享一下这两个概念。最近很忙&#xff0c;更的不及时&#xff0c;实在抱歉。也感谢大家不离不弃&#xff01;对于更文分享这件…

机器学习笔记--classification_report精确度/召回率/F1值

classification_report简介 sklearn中的classification_report函数用于显示主要分类指标的文本报告&#xff0e;在报告中显示每个类的精确度&#xff0c;召回率&#xff0c;F1值等信息。 主要参数: y_true&#xff1a;1维数组&#xff0c;或标签指示器数组/稀疏矩阵&#xf…

YOLOv5~目标检测模型精确度

还是yolo5的基础啊~~ 一些关于目标检测模型的评估指标&#xff1a;IOU、TP&FP&FN&TN、mAP等&#xff0c;并列举了目标检测中的mAP计算。 指标评估(重要的一些定义) IOU 也称重叠度表示计算预测回归框和真实回归框的交并比,计算公式如下: TP&FP&FN&…