ThingsBoard网关mqtt连接器案例及双向RPC的BUG修复

article/2025/10/13 15:54:41

文章目录

  • 说明
  • 过程演示
  • 文字展示视频操作过程
    • 修改网关配置
    • MQTT连接器配置
    • JS模拟网关子设备
    • 添加网关设备
    • 启动网关
    • 启动js模拟设备
    • 创建开关小部件
      • A
      • B
      • C
      • D
      • E
  • MQTT连接器双向RPC的BUG修复

说明

通过下面案例了解MQTT连接器的使用,包括遥测,属性,单向双向RPC。
案例:有个开关传感器,且集成一些温湿度,温湿度上传为设备遥测,型号等上传为设备属性。单向RPC控制开关,双向RPC获取开关状态。
BUG:tb网关的MQTT连接器双向RPC存在问题,无法成功,视频和文章会演示如何解决BUG。

过程演示

ThingsBoard网关 mqtt连接器演示及双向RPC的BUG修复

文字展示视频操作过程

修改网关配置

thingsboard_gateway/config/tb_gateway.yaml

thingsboard:host: 192.168.7.198 # TB mqtt ipport: 1883 # TB mqtt端口remoteShell: falseremoteConfiguration: falsesecurity:accessToken: LmX4G3nhJXRr0zOk6mzL # mqtt网关设备accesstokenqos: 0
storage:type: memoryread_records_count: 100max_records_count: 100000
connectors: # 打开MQTT连接器-name: MQTT Broker Connectortype: mqttconfiguration: mqtt-test.json #指定mqtt连接器配置文件

MQTT连接器配置

{"broker": {"name":"EMQX Broker","host":"192.168.7.190","port":1883,"clientId": "ThingsBoard_gateway","security": {"type": "basic","username": "admin","password": "admin"}},"mapping": [{"topicFilter": "sensor/data","converter": {"type": "json","deviceNameJsonExpression": "${serialNumber}","deviceTypeJsonExpression": "${sensorType}","timeout": 60000,"attributes": [{"type": "string","key": "model","value": "${sensorModel}"},{"type": "string","key": "${sensorModel}","value": "on"}],"timeseries": [{"type": "double","key": "temperature","value": "${temp}"},{"type": "double","key": "humidity","value": "${hum}"},{"type": "boolean","key": "occupancy","value": "${occ}"},{"type": "int","key": "state","value": "${state}"}]}}],"connectRequests": [{"topicFilter": "sensor/connect","deviceNameJsonExpression": "${serialNumber}"}],"disconnectRequests": [{"topicFilter": "sensor/disconnect","deviceNameJsonExpression": "${serialNumber}"}],"attributeUpdates": [{"deviceNameFilter": ".*","attributeFilter": "uploadFrequency","topicExpression": "sensor/${serialNumber}/${attributeKey}","valueExpression": "{\"${attributeKey}\":\"${attributeValue}\"}"}],"serverSideRpc": [{"deviceNameFilter": ".*","methodFilter": "getState","requestTopicExpression": "sensor/${deviceName}/request/${methodName}","responseTopicExpression": "sensor/${deviceName}/response/${methodName}","responseTimeout": 10000,"valueExpression": "${params}"},{"deviceNameFilter": ".*","methodFilter": "setState","requestTopicExpression": "sensor/${deviceName}/request/${methodName}","valueExpression": "${params}"}]
}

JS模拟网关子设备

安装依赖 npm install mqtt
js文件名 mqtt_client.js
注意下面的日志格式,后面不在赘述。

var mqtt = require('mqtt');
console.log('start mqtt_client,prepare connect');
//连接emqx broker
var client = mqtt.connect('mqtt://192.168.7.190:1883', {username: 'admin',password: 'admin',
});//开关状态
state = 0;
var preTimer;
//设备连接回调
client.on('connect', function () {console.log('connected');//发送连接请求publish('sensor/connect', {serialNumber: 'SN-002',});//订阅服务端RPCclient.subscribe('sensor/+/request/+');//订阅设备属性更新client.subscribe('sensor/+/+');//不断的上传遥测preTimer = telemetry();
});//收到消息回调
client.on('message', function (topic, message) {console.log('on message:');console.log(topic);console.log(message.toString());console.log('--------------------------------------');let data = JSON.parse(message);if (new RegExp('sensor/+/request/+'.replace('+', '[^/]+')).test(topic)) {console.log('RegExp success');var levels = topic.split('/');if (levels[3] === 'getState') {console.log('receive getState');//回复 twoway RPCpublish('sensor/' + levels[1] + '/response/' + levels[3], {state: state,});}if (levels[3] === 'setState') {console.log('receive setState');state = data.stateValue;//重新发送遥测 js没有双向绑定只能这样操作了if (preTimer != null) {clearInterval(preTimer);}preTimer = telemetry();}}
});//发送遥测 返回定时器对象
function telemetry() {return setInterval(() => {publish('sensor/data', {serialNumber: 'SN-002',sensorType: 'default',sensorModel: 'SN-model','SN-model': 'on',temp: Math.random(),hum: Math.random(),occ: true,state: state,});}, 3000);
}
//发送数据+打印日志
function publish(topic, message) {client.publish(topic, JSON.stringify(message));log(topic, message);
}//控制台打印
function log(topic, message) {console.log('send message:');console.log(topic);console.log(JSON.stringify(message));console.log('===================================');
}

添加网关设备

image.png

启动网关

网关日志打印 tb_device_mqtt - 141 - connection SUCCESSb 代表网关连接TB MQTT服务器成功。

""2021-04-23 13:17:12" - INFO - [tb_gateway_service.py] - tb_gateway_service - 138 - Gateway started."
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 235 - MQTT Broker Connector connected to 192.168.7.190:1883 - successfully."
""2021-04-23 13:17:12" - INFO - [tb_loader.py] - tb_loader - 66 - Import JsonMqttUplinkConverter from /Users/weijixin/PycharmProjects/thingsboard-gateway/thingsboard_gateway/connectors/mqtt."
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 276 - Connector "MQTT Broker Connector" subscribe to sensor/data"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/data, subscription message id = 1"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/connect, subscription message id = 2"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/disconnect, subscription message id = 3"
""2021-04-23 13:17:12" - INFO - [tb_device_mqtt.py] - tb_device_mqtt - 141 - connection SUCCESS"
""2021-04-23 13:17:12" - INFO - [tb_gateway_mqtt.py] - tb_gateway_mqtt - 176 - Subscribed to *|* with id 2"

启动js模拟设备

控制台执行 node mqtt_client.js
前俩行日志打印如下,则js模拟设备成功连接emqx broker MQTT服务器

start mqtt_client,prepare connect
connected

接下来会发送一条连接请求,主题sensor/connect到emqx broker,日志如下

send message:
sensor/connect
{"serialNumber":"SN-002"}
===================================

对应mqtt-test.json配置

"connectRequests": [{"topicFilter": "sensor/connect","deviceNameJsonExpression": "${serialNumber}"}],

网关收到打印如下,SN-002就是网关解析的子设备名

""2021-04-23 13:17:14" - INFO - [mqtt_connector.py] - mqtt_connector - 402 - Connecting device SN-002 of type default"

此时打开tb页面,设备列表出现设备SN-002


接着js会开始定时3s间隔不断的发送遥测,此时tb页面可以观察到客户端属性和遥测数据了。

创建开关小部件

创建Round switch``小部件,图中的ADCDE部分一一解释。
image.png

A

机器翻译:属性/时间序列值键(仅当订阅属性/时间序列方法时)
应该就是从遥测或属性中找state数据点,这个输入框我也比较迷糊,他的值用来做什么的?直接拿来做开关量的话,不需要转换函数吗?

B

通过发送输入框指定的双向RPC方法(有响应的请求),获取开关的值,并且用D部分的函数转换RPC返回值为true/false,从而改变开关状态。

C

通过发送输入框指定的RPC方法,改变子设备对应的开关量状态值,发送方法的参数用E部分的函数转换。

D

参考B的解释

E

点击开关按钮后会给此函数传入value,点击开,传入value就是true;点击关传入value就是false。然后经过自己编写的函数转换后作为C方法的参数发送。


小部件创建好了,应该会自动发送一个getState双向RPC请求来确定自己目前是开还是关的状态,但是看看网关日志吧,报错了,反复调试后确定是BUG.下面说说这个BUG如何修复!

MQTT连接器双向RPC的BUG修复

开关小部件创建成功后,会发送getState的双向RPC,此时网关报错如下:

Exception in thread Thread-6:
Traceback (most recent call last):File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/threading.py", line 917, in _bootstrap_innerself.run()File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/threading.py", line 865, in runself._target(*self._args, **self._kwargs)File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 3452, in _thread_mainself.loop_forever(retry_first_connection=True)File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 1779, in loop_foreverrc = self.loop(timeout, max_packets)File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 1181, in looprc = self.loop_read(max_packets)File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 1572, in loop_readrc = self._packet_read()File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 2310, in _packet_readrc = self._packet_handle()File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 2936, in _packet_handlereturn self._handle_publish()File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 3216, in _handle_publishself._handle_on_message(message)File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 3444, in _handle_on_messageself.on_message(self, self._userdata, message)File "/Users/weijixin/PycharmProjects/thingsboard-gateway/thingsboard_gateway/connectors/mqtt/mqtt_connector.py", line 505, in _on_messageif message.topic in self.__gateway.rpc_requests_in_progress:
AttributeError: 'TBGatewayService' object has no attribute 'rpc_requests_in_progress'

关键就是最后一句'TBGatewayService' object has no attribute 'rpc_requests_in_progress'
说网关服务类没有属性rpc_requests_in_progress
打开网关服务类thingsboard_gateway/gateway/tb_gateway_service.py找到这个属性看一下是这样的:
self.__rpc_requests_in_progress = {}
python类中俩个下划线 __代表私有属性,所以在报错位置thingsboard_gateway/connectors/mqtt/mqtt_connector.py是调用不到这个属性的,报错是肯定的。
if message.topic in self.__gateway.rpc_requests_in_progress:

if message.topic in self.__gateway.rpc_requests_in_progress:self.__gateway.rpc_with_reply_processing(message.topic, content)return None

修复方法就是打开tb_gateway_service.py将属性改为公开属性,即:替换所有self.__rpc_requests_in_progressself.rpc_requests_in_progress
你以为这就完事了?
下面俩处不解释了,照着改了,双向RPC就没问题了。

  1. tb_gateway_service.pyrpc_with_reply_processing方法,注释图中一行
    在这里插入图片描述

  2. tb_gateway_service.pyregister_rpc_request_timeout方法
    没改之前是这样的
    在这里插入图片描述
    改成这样
    在这里插入图片描述
    到这里就好啦,再试试双向RPC吧!
    不点赞美女哭了.jpg


http://chatgpt.dhexx.cn/article/cRcA7t8e.shtml

相关文章

详解CAN总线:常用CAN连接器的使用方法

目录 1、9针DSUB 2、5针迷你C型接头 3、6针德驰DT04-6P 本文将分享几种常用的CAN连接器的连线和使用方法。 1、9针DSUB CiA推荐使用9针DSUB作为工业标准连接器,实物如下图所示: 引脚定义和说明如下所示: 2、5针迷你C型接头 DeviceNet协…

Exchange 2013接收连接器以及邮件客户端若干问题介绍

一. 什么是接收连接器? 上图显示出了Exchange 2013完整安装后默认的5种接收连接器。 接收连接器用于控制发送到Exchange组织的入站邮件流,不同的接收连接器负责监听不同的网络端口,独立控制客户的认证等。 接收连接器的作用域限于单台服务器&…

汽车常用连接器接口定义

OBD II 连接器 也称SAE J1962连接器,其包含有CAN线,K线,J1850总线。 1. J1962公接头(中间)和J1962母接头(下)标题 表1 OBD-II连接器端口定义 ContactGeneral allocation1,3,8,9,11,12,13 Assig…

Gen-Z 可扩展连接器和SFF-TA-1002规范。

Hello大家好,在最近的一个项目中,我遇到了一个问题,就是说Gen-Z连接器和符合SFF-1002规范的连接器是什么关系?二者是不是同一种连接器?带着问题,我准备给他小刀拉屁股——开开眼儿。 话说2017年6月份&…

线束音视频传输连接器FAKRA与HSD区别?

为了防止有人看不明白文中涉及的术语和定义,我这里先解释一下: FAKRA连接器 FachkreisAutomobil Connector FAKRA是一种射频信号连接器(以下简称FAKRA)。 HSD连接器 HighSpeed Data Connector HSD是一种高速数据连接器,支持USB2.0、LVDS、IEE…

FMC标准以及FMC连接器介绍

一、FMC标准 FMC标准描述了一个通用的模块,它是以一定范围的应用,环境和市场为目标的。 该标准由包括 FPGA 厂商和最终用户在内的公司联盟开发,旨在为基础板(载卡)上的 FPGA 提供标准的夹层板(子卡)尺寸、连接…

关于使用腾讯云HiFlow场景连接器每天提醒签到打卡

目录 前言: HiFlow: 配置:​编辑 设置执行条件 ​编辑 设置群发机器人: 缺点与不足: 总结: 前言: 在我们日常生活中总会有一些签到比如:我们的掘金签到,王者荣耀签…

七、Kafka Connector (连接器)

Kafka Connect 是一种用于在 Apache Kafka 和其他数据系统之间可扩展且可靠地流式传输数据的工具。它使快速定义将大型数据集移入和移出 Kafka 的连接器变得简单。Kafka Connect 可以摄取整个数据库或从应用程序服务器收集指标到 Kafka 主题中,使数据可用于低延迟的…

连接器是什么?连接器有什么作用?

连接器是什么? 连接器,即CONNECTOR。 在中国也称为连接器、插头和插座。 一般指连接器。 即连接两个有源设备以传输电流或信号的设备。 广泛应用于航空、航天、国防等军事系统。 连接器是我们电子工程技术人员经常接触到的部件。 SIM卡插座连接器是手机、…

一分二网线连接器正确使用方法

考虑到公司有很多同事需要连接pc的同时要连接无线路由器,但是每个工位只有一个网络面板。 所以就采购了一批一分二网线连接器 本来以为可以直接把一根网线分成两根直接使用,想法如下图: 只能说想法太天真!!&#xff0…

电路板常用连接器(接插件)介绍与选型建议(板对板连接器,板对线连接器,线对线连接器等)

提示:本文介绍常用的连接器,这些连接器适合常规的工作条件。本文适合刚接触电路板或者对连接器不熟悉的攻城狮阅读。 文章目录 前言一、连接器具体应该叫什么?二、连接器型号分类1.目前的型号是怎么来的和分类2.选型注意事项3.连接器的购买与…

常见的光纤连接器有哪些?光纤连接器的种类科普

光纤连接器根据光纤连接器结构类型的不同可以分为不同的类型,根据传输介质的不同可分为单模光纤连接器和多模光纤连接器; 适用于FC、SC、ST、D4、DIN、Biconic、MU、LC、MT等类型; 连接器按引脚端面可分为PC(UPC)和APC…

Wafer连接器的介绍

Wafer连接器通常是指连接器底座(芯片座)连接器,一般由金属件和塑料件组装而成。 与只有塑料件(有些有铁壳)的软管不同,它是由电线和端子组装而成的。wafer插座连接器由固定端电连接器,即阴接触件&#xff0…

连接器分类及选型

常用连接器分类 D型连接器 射频连接器 圆形连接器 背板连接器 RJ连接器(电话连接器、网口连接器) 扁平电缆连接器 功率连接器 端子类连接器 标准间距连接器 欧式连接器 光纤连接器 其他连接器 D型连接器 D型普通:9芯、15芯、25…

链接器 --- Linker

链接器 1. 背景 ​ 对于经常使用 IDE 的开发者,通常点击一个按钮就万事大吉了,这虽然极大简化了过程,但是对于我们C语言这些相对底层的开发者来说非常非常不友好,屏蔽了大量细节,不了解内部细节是非常可怕的&#xf…

arm-gcc链接器和链接脚本

本文主要介绍了链接器和链接脚本的基本内容。主要偏向于入门级以及常见容易混淆的知识点。 1. 链接器介绍 在现在软件工程中,程序一般都比较复杂,通常由多个源文件组成。在编译的过程中会对这些源文件进行汇编或者编译然后生成目标文件。这些目标文件一…

彻底理解链接器:一

目录 什么是链接器(Linker) 链接器可操作的元素 链接器是如何工作的 过程一:符号决议 c源文件中都有什么 目标文件里有什么 符号表(Symbol table) 符号表存放在哪里 符号决议的过程 实例说明undefined reference 过程二&#xff1…

【python】pip指定路径安装文件

在网上下载个tar.gz的安装包,用pip在指定目录安装 pip install --target路径 文件名 pip install --targetE:\work\zicai\pd_code\AutoTest3.7\shujia\venv\Lib\site-packages xlwings-0.18.0.tar.gz 指定下载pyecharts包1.7.0版本到执行路径 pip3 install -i h…

如何查看Ubuntu中Python的安装路径

ubuntu查看python安装路径 查找方法: python import sys pythonpath sys.executable print(pythonpath)Python的安装路径:/usr/bin/python 可提供远程搭建运行服务 不会调试运行的同学,你只需打开远程,会帮你搭建调试好一切&a…

如何找到python 安装路径

我觉得第二个方法更实用,直接按个快捷键F8就行了 第一个:打开python,或者命令行 输入 import sys for i in sys.path: ... print(i) 输出 C:\Users\ASUS\AppData\Local\Programs\Python\Python37\python37.zip C:\Users\ASUS\AppDat…