文章目录
- 说明
- 过程演示
- 文字展示视频操作过程
- 修改网关配置
- MQTT连接器配置
- JS模拟网关子设备
- 添加网关设备
- 启动网关
- 启动js模拟设备
- 创建开关小部件
- A
- B
- C
- D
- E
- MQTT连接器双向RPC的BUG修复
说明
通过下面案例了解MQTT连接器的使用,包括遥测,属性,单向双向RPC。
案例:有个开关传感器,且集成一些温湿度,温湿度上传为设备遥测,型号等上传为设备属性。单向RPC控制开关,双向RPC获取开关状态。
BUG:tb网关的MQTT连接器双向RPC存在问题,无法成功,视频和文章会演示如何解决BUG。
过程演示
ThingsBoard网关 mqtt连接器演示及双向RPC的BUG修复
文字展示视频操作过程
修改网关配置
thingsboard_gateway/config/tb_gateway.yaml
thingsboard:host: 192.168.7.198 # TB mqtt ipport: 1883 # TB mqtt端口remoteShell: falseremoteConfiguration: falsesecurity:accessToken: LmX4G3nhJXRr0zOk6mzL # mqtt网关设备accesstokenqos: 0
storage:type: memoryread_records_count: 100max_records_count: 100000
connectors: # 打开MQTT连接器-name: MQTT Broker Connectortype: mqttconfiguration: mqtt-test.json #指定mqtt连接器配置文件
MQTT连接器配置
{"broker": {"name":"EMQX Broker","host":"192.168.7.190","port":1883,"clientId": "ThingsBoard_gateway","security": {"type": "basic","username": "admin","password": "admin"}},"mapping": [{"topicFilter": "sensor/data","converter": {"type": "json","deviceNameJsonExpression": "${serialNumber}","deviceTypeJsonExpression": "${sensorType}","timeout": 60000,"attributes": [{"type": "string","key": "model","value": "${sensorModel}"},{"type": "string","key": "${sensorModel}","value": "on"}],"timeseries": [{"type": "double","key": "temperature","value": "${temp}"},{"type": "double","key": "humidity","value": "${hum}"},{"type": "boolean","key": "occupancy","value": "${occ}"},{"type": "int","key": "state","value": "${state}"}]}}],"connectRequests": [{"topicFilter": "sensor/connect","deviceNameJsonExpression": "${serialNumber}"}],"disconnectRequests": [{"topicFilter": "sensor/disconnect","deviceNameJsonExpression": "${serialNumber}"}],"attributeUpdates": [{"deviceNameFilter": ".*","attributeFilter": "uploadFrequency","topicExpression": "sensor/${serialNumber}/${attributeKey}","valueExpression": "{\"${attributeKey}\":\"${attributeValue}\"}"}],"serverSideRpc": [{"deviceNameFilter": ".*","methodFilter": "getState","requestTopicExpression": "sensor/${deviceName}/request/${methodName}","responseTopicExpression": "sensor/${deviceName}/response/${methodName}","responseTimeout": 10000,"valueExpression": "${params}"},{"deviceNameFilter": ".*","methodFilter": "setState","requestTopicExpression": "sensor/${deviceName}/request/${methodName}","valueExpression": "${params}"}]
}
JS模拟网关子设备
安装依赖 npm install mqtt
js文件名 mqtt_client.js
注意下面的日志格式,后面不在赘述。
var mqtt = require('mqtt');
console.log('start mqtt_client,prepare connect');
//连接emqx broker
var client = mqtt.connect('mqtt://192.168.7.190:1883', {username: 'admin',password: 'admin',
});//开关状态
state = 0;
var preTimer;
//设备连接回调
client.on('connect', function () {console.log('connected');//发送连接请求publish('sensor/connect', {serialNumber: 'SN-002',});//订阅服务端RPCclient.subscribe('sensor/+/request/+');//订阅设备属性更新client.subscribe('sensor/+/+');//不断的上传遥测preTimer = telemetry();
});//收到消息回调
client.on('message', function (topic, message) {console.log('on message:');console.log(topic);console.log(message.toString());console.log('--------------------------------------');let data = JSON.parse(message);if (new RegExp('sensor/+/request/+'.replace('+', '[^/]+')).test(topic)) {console.log('RegExp success');var levels = topic.split('/');if (levels[3] === 'getState') {console.log('receive getState');//回复 twoway RPCpublish('sensor/' + levels[1] + '/response/' + levels[3], {state: state,});}if (levels[3] === 'setState') {console.log('receive setState');state = data.stateValue;//重新发送遥测 js没有双向绑定只能这样操作了if (preTimer != null) {clearInterval(preTimer);}preTimer = telemetry();}}
});//发送遥测 返回定时器对象
function telemetry() {return setInterval(() => {publish('sensor/data', {serialNumber: 'SN-002',sensorType: 'default',sensorModel: 'SN-model','SN-model': 'on',temp: Math.random(),hum: Math.random(),occ: true,state: state,});}, 3000);
}
//发送数据+打印日志
function publish(topic, message) {client.publish(topic, JSON.stringify(message));log(topic, message);
}//控制台打印
function log(topic, message) {console.log('send message:');console.log(topic);console.log(JSON.stringify(message));console.log('===================================');
}
添加网关设备
启动网关
网关日志打印 tb_device_mqtt - 141 - connection SUCCESSb
代表网关连接TB MQTT服务器成功。
""2021-04-23 13:17:12" - INFO - [tb_gateway_service.py] - tb_gateway_service - 138 - Gateway started."
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 235 - MQTT Broker Connector connected to 192.168.7.190:1883 - successfully."
""2021-04-23 13:17:12" - INFO - [tb_loader.py] - tb_loader - 66 - Import JsonMqttUplinkConverter from /Users/weijixin/PycharmProjects/thingsboard-gateway/thingsboard_gateway/connectors/mqtt."
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 276 - Connector "MQTT Broker Connector" subscribe to sensor/data"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/data, subscription message id = 1"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/connect, subscription message id = 2"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/disconnect, subscription message id = 3"
""2021-04-23 13:17:12" - INFO - [tb_device_mqtt.py] - tb_device_mqtt - 141 - connection SUCCESS"
""2021-04-23 13:17:12" - INFO - [tb_gateway_mqtt.py] - tb_gateway_mqtt - 176 - Subscribed to *|* with id 2"
启动js模拟设备
控制台执行 node mqtt_client.js
前俩行日志打印如下,则js模拟设备成功连接emqx broker MQTT服务器
start mqtt_client,prepare connect
connected
接下来会发送一条连接请求,主题sensor/connect
到emqx broker,日志如下
send message:
sensor/connect
{"serialNumber":"SN-002"}
===================================
对应mqtt-test.json
配置
"connectRequests": [{"topicFilter": "sensor/connect","deviceNameJsonExpression": "${serialNumber}"}],
网关收到打印如下,SN-002
就是网关解析的子设备名
""2021-04-23 13:17:14" - INFO - [mqtt_connector.py] - mqtt_connector - 402 - Connecting device SN-002 of type default"
此时打开tb页面,设备列表出现设备SN-002
。
接着js会开始定时3s间隔不断的发送遥测,此时tb页面可以观察到客户端属性和遥测数据了。
创建开关小部件
创建Round switch``小部件
,图中的ADCDE部分一一解释。
A
机器翻译:属性/时间序列值键(仅当订阅属性/时间序列方法时)
应该就是从遥测或属性中找state
数据点,这个输入框我也比较迷糊,他的值用来做什么的?直接拿来做开关量的话,不需要转换函数吗?
B
通过发送输入框指定的双向RPC方法(有响应的请求),获取开关的值,并且用D部分的函数转换RPC返回值为true/false,从而改变开关状态。
C
通过发送输入框指定的RPC方法,改变子设备对应的开关量状态值,发送方法的参数用E部分的函数转换。
D
参考B的解释
E
点击开关按钮后会给此函数传入value,点击开,传入value就是true;点击关传入value就是false。然后经过自己编写的函数转换后作为C方法的参数发送。
小部件创建好了,应该会自动发送一个getState双向RPC请求
来确定自己目前是开还是关的状态,但是看看网关日志吧,报错了,反复调试后确定是BUG.下面说说这个BUG如何修复!
MQTT连接器双向RPC的BUG修复
开关小部件创建成功后,会发送getState的双向RPC,此时网关报错如下:
Exception in thread Thread-6:
Traceback (most recent call last):File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/threading.py", line 917, in _bootstrap_innerself.run()File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/threading.py", line 865, in runself._target(*self._args, **self._kwargs)File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 3452, in _thread_mainself.loop_forever(retry_first_connection=True)File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 1779, in loop_foreverrc = self.loop(timeout, max_packets)File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 1181, in looprc = self.loop_read(max_packets)File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 1572, in loop_readrc = self._packet_read()File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 2310, in _packet_readrc = self._packet_handle()File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 2936, in _packet_handlereturn self._handle_publish()File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 3216, in _handle_publishself._handle_on_message(message)File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 3444, in _handle_on_messageself.on_message(self, self._userdata, message)File "/Users/weijixin/PycharmProjects/thingsboard-gateway/thingsboard_gateway/connectors/mqtt/mqtt_connector.py", line 505, in _on_messageif message.topic in self.__gateway.rpc_requests_in_progress:
AttributeError: 'TBGatewayService' object has no attribute 'rpc_requests_in_progress'
关键就是最后一句'TBGatewayService' object has no attribute 'rpc_requests_in_progress'
说网关服务类没有属性rpc_requests_in_progress
打开网关服务类thingsboard_gateway/gateway/tb_gateway_service.py
找到这个属性看一下是这样的:
self.__rpc_requests_in_progress = {}
python类中俩个下划线 __
代表私有属性,所以在报错位置thingsboard_gateway/connectors/mqtt/mqtt_connector.py
是调用不到这个属性的,报错是肯定的。
if message.topic in self.__gateway.rpc_requests_in_progress:
if message.topic in self.__gateway.rpc_requests_in_progress:self.__gateway.rpc_with_reply_processing(message.topic, content)return None
修复方法就是打开tb_gateway_service.py
将属性改为公开属性,即:替换所有self.__rpc_requests_in_progress
为self.rpc_requests_in_progress
你以为这就完事了?
下面俩处不解释了,照着改了,双向RPC就没问题了。
-
tb_gateway_service.py
的rpc_with_reply_processing
方法,注释图中一行
-
tb_gateway_service.py
的register_rpc_request_timeout
方法
没改之前是这样的
改成这样
到这里就好啦,再试试双向RPC吧!