c#使用MQTT通信

article/2025/9/11 22:45:34

mqtt的功能请看百度百科,MQTT_百度百科。

这里简单说一下,mqtt的功能就是一个发布/订阅的功能

例如,接下来,我们做一个例子。现在有A服务器作为发布方,B客户端作为订阅方。如果A发布了消息,那么会向B发送消息,反之亦然,从而进行通信的功能。

一、A服务端代码

1.建立一个.net5的web api项目

2.nuget引用mqttnet

3.MqttService代码

using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace mqtt
{public class MqttService{public static MqttServer _mqttServer { get; set; }public static void PublishData(string data){var message = new MqttApplicationMessage{Topic = "topic1",Payload = Encoding.Default.GetBytes(data),QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,Retain = true  // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。};_mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 发送消息给有订阅 topic_01的客户端{SenderClientId = "Server_01"}).GetAwaiter().GetResult();}}
}

4.MqttHostService代码

using Microsoft.Extensions.Hosting;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;namespace mqtt
{public class MqttHostService : IHostedService, IDisposable{public void Dispose(){}const string ServerClientId = "SERVER";public Task StartAsync(CancellationToken cancellationToken){MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder();optionsBuilder.WithDefaultEndpoint();//optionsBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse("127.0.0.1"));optionsBuilder.WithDefaultEndpointPort(10086); // 设置 服务端 端口号optionsBuilder.WithConnectionBacklog(1000); // 最大连接数MqttServerOptions options = optionsBuilder.Build();MqttService._mqttServer = new MqttFactory().CreateMqttServer(options);MqttService._mqttServer.ClientConnectedAsync += _mqttServer_ClientConnectedAsync; //客户端连接事件MqttService._mqttServer.ClientDisconnectedAsync += _mqttServer_ClientDisconnectedAsync; // 客户端关闭事件MqttService._mqttServer.ApplicationMessageNotConsumedAsync += _mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件MqttService._mqttServer.ClientSubscribedTopicAsync += _mqttServer_ClientSubscribedTopicAsync; // 客户端订阅主题事件MqttService._mqttServer.ClientUnsubscribedTopicAsync += _mqttServer_ClientUnsubscribedTopicAsync; // 客户端取消订阅事件MqttService._mqttServer.StartedAsync += _mqttServer_StartedAsync; // 启动后事件MqttService._mqttServer.StoppedAsync += _mqttServer_StoppedAsync; // 关闭后事件MqttService._mqttServer.InterceptingPublishAsync += _mqttServer_InterceptingPublishAsync; // 消息接收事件MqttService._mqttServer.ValidatingConnectionAsync += _mqttServer_ValidatingConnectionAsync; // 用户名和密码验证有关MqttService._mqttServer.StartAsync();return Task.CompletedTask;}/// <summary>/// 客户端订阅主题事件/// </summary>/// <param name="arg"></param>/// <returns></returns>private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg){Console.WriteLine($"ClientSubscribedTopicAsync:客户端ID=【{arg.ClientId}】订阅的主题=【{arg.TopicFilter}】 ");return Task.CompletedTask;}/// <summary>/// 关闭后事件/// </summary>/// <param name="arg"></param>/// <returns></returns>private Task _mqttServer_StoppedAsync(EventArgs arg){Console.WriteLine($"StoppedAsync:MQTT服务已关闭……");return Task.CompletedTask;}/// <summary>/// 用户名和密码验证有关/// </summary>/// <param name="arg"></param>/// <returns></returns>private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg){arg.ReasonCode = MqttConnectReasonCode.Success;if ((arg.Username ?? string.Empty) != "admin" || (arg.Password ?? String.Empty) != "123456"){arg.ReasonCode = MqttConnectReasonCode.Banned;Console.WriteLine($"ValidatingConnectionAsync:客户端ID=【{arg.ClientId}】用户名或密码验证错误 ");}return Task.CompletedTask;}/// <summary>/// 消息接收事件/// </summary>/// <param name="arg"></param>/// <returns></returns>private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg){if (string.Equals(arg.ClientId, ServerClientId)){return Task.CompletedTask;}Console.WriteLine($"InterceptingPublishAsync:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");return Task.CompletedTask;}/// <summary>/// 启动后事件/// </summary>/// <param name="arg"></param>/// <returns></returns>private Task _mqttServer_StartedAsync(EventArgs arg){Console.WriteLine($"StartedAsync:MQTT服务已启动……");return Task.CompletedTask;}/// <summary>/// 客户端取消订阅事件/// </summary>/// <param name="arg"></param>/// <returns></returns>private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg){Console.WriteLine($"ClientUnsubscribedTopicAsync:客户端ID=【{arg.ClientId}】已取消订阅的主题=【{arg.TopicFilter}】  ");return Task.CompletedTask;}private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg){Console.WriteLine($"ApplicationMessageNotConsumedAsync:发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");return Task.CompletedTask;}/// <summary>/// 客户端断开时候触发/// </summary>/// <param name="arg"></param>/// <returns></returns>/// <exception cref="NotImplementedException"></exception>private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg){Console.WriteLine($"ClientDisconnectedAsync:客户端ID=【{arg.ClientId}】已断开, 地址=【{arg.Endpoint}】  ");return Task.CompletedTask;}/// <summary>/// 客户端连接时候触发/// </summary>/// <param name="arg"></param>/// <returns></returns>private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg){Console.WriteLine($"ClientConnectedAsync:客户端ID=【{arg.ClientId}】已连接, 用户名=【{arg.UserName}】地址=【{arg.Endpoint}】  ");return Task.CompletedTask;}public Task StopAsync(CancellationToken cancellationToken){return Task.CompletedTask;}}
}

5.在Startup注入代码

 services.AddHostedService<MqttHostService>();

6.增加一个服务器发送消息的方法

  public IActionResult Test(string data){MqttService.PublishData(data);return Ok();}

 二、B客户端

1.建立一个.net5  web api项目

2.nuget引用mqttnet

 3.MqttClientService代码

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace mqttClient
{public class MqttClientService{public static IMqttClient _mqttClient;public void MqttClientStart(){var optionsBuilder = new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", 10086) // 要访问的mqtt服务端的 ip 和 端口号.WithCredentials("admin", "123456") // 要访问的mqtt服务端的用户名和密码.WithClientId("testclient02") // 设置客户端id.WithCleanSession().WithTls(new MqttClientOptionsBuilderTlsParameters{UseTls = false  // 是否使用 tls加密});var clientOptions = optionsBuilder.Build();_mqttClient = new MqttFactory().CreateMqttClient();_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 客户端连接成功事件_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 客户端连接关闭事件_mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件_mqttClient.ConnectAsync(clientOptions);}/// <summary>/// 客户端连接关闭事件/// </summary>/// <param name="arg"></param>/// <returns></returns>private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg){Console.WriteLine($"客户端已断开与服务端的连接……");return Task.CompletedTask;}/// <summary>/// 客户端连接成功事件/// </summary>/// <param name="arg"></param>/// <returns></returns>private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg){Console.WriteLine($"客户端已连接服务端……");// 订阅消息主题// MqttQualityOfServiceLevel: (QoS):  0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。// 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。// 2: 保证每条消息仅由预期的收件人接收一次。级别2是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。_mqttClient.SubscribeAsync("topic1", MqttQualityOfServiceLevel.AtLeastOnce); //topic_02return Task.CompletedTask;}/// <summary>/// 收到消息事件/// </summary>/// <param name="arg"></param>/// <returns></returns>private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg){Console.WriteLine($"ApplicationMessageReceivedAsync:客户端ID=【{arg.ClientId}】接收到消息。 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");return Task.CompletedTask;}public void Publish(string data){var message = new MqttApplicationMessage{Topic = "topic2",Payload = Encoding.Default.GetBytes(data),QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,Retain = true  // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。};_mqttClient.PublishAsync(message);}}
}

4.启动的时候,实例化调用,并且发送消息456

  三、效果

1.同时启动2个项目

 2.在服务端的api中输入发送的消息。

3.可以看到B客户端收到了消息

4.我在B客户端中点击回车,就把刚才说的456发送给了A服务端。

 注意:一个发布,一个订阅,发布的Topic和订阅的Topic一定要是同一个,就如同打电话一样,双方要在同一条线上。

如果运行不了,就换一台电脑,可能是系统组件的问题,我就遇到了这个问题。

拓展

实际上,服务器的代码可以不用写,我们只用写客户端的代码就行了。客户端控制发布和订阅的Topic即可,服务器只是一个转换的作用。

一、服务器安装

1.目前MQTT代理的主流平台有下面几个

  • Mosquitto:https://mosquitto.org/
  • VerneMQ:https://vernemq.com/
  • EMQTT:http://emqtt.io/

 2.我们选择第一个进行下载,直接安装

3.安装完成后,进入文件夹,按住Shift,右键鼠标点击空白处,然后打开Powershell,正常打开一个终端软件即可

  • 输入./mosquitto.exe -h 可以查看相应的帮助;
  • 输入./mosquitto.exe -p 10086,就开启了MQTT服务,监听的地址是127.0.0.1,端口是10086

 

二、客户端依然使用上面的代码

1.因为我们要进行2个客户端直接通信,那么就要修改对应发布和订阅的Topic 

客户端1

修改3个地方,生产一个EXE 

客户端2 

 修改3个地方,生产一个EXE 。

2.分别生产了2个文件夹的EXE

3.分别运行,效果,可以看到2个客户端互相给对方发送了456的消息。

三、代码链接

mqtt,mqtt,mqtt,mqtt-C#文档类资源-CSDN下载


http://chatgpt.dhexx.cn/article/Sjb9Dg58.shtml

相关文章

MQTT-java使用说明

MQTT-java使用说明 本文的资料下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1OCfsQ_NqcehKy86kYkA-wg?pwd1234 提取码&#xff1a;1234 MQTT基本介绍 MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。 特点 易于实现数据传输的服务质量可控占用…

MQTT服务器的搭建与MQTT客户端的使用

目录 一、MQTT服务器&#xff08;emqx&#xff09;搭建1、下载 MQTT Broker2、下载完成后&#xff0c;解压的目录内容如下&#xff1a;3、进入 bin目录下4、直接在当前目录下输入cmd5、启动 emqx6、重置Dashboard 密码7、输入账号密码进入MQTT服务器控制台8、正常登录的界面如下…

Android+MQTT(Android APP 连接至MQTT服务器)

这段时间因为毕设的需求&#xff0c;接触学习了一下用Android app连接MQTT服务器&#xff0c;然后对下位机&#xff08;STM32/ESP8266&#xff09;进行数据通讯的一个小设计&#xff0c;本篇文章介绍appMQTT服务器这一段。 实现原理&#xff1a; 这里设计的是一个监测控制智能…

MQTT:windows最简单搭建mqtt服务端及本地客户端测试

一、mqtt服务端搭建 推荐使用emq的开源服务端emqx&#xff0c;目前是使用最广泛的mqtt服务端。 github上下载地址为https://github.com/emqx/emqx/releases&#xff0c;当前最新windows版本是 emqx-windows-4.3.6.zip。 1.运行服务端 下载完解压缩出来&#xff0c;在bin目…

springboot 整合 mqtt

springboot 整合 mqtt 最近由于iot越来越火, 物联网的需求越来越多, 那么理所当然的使用mqtt的场景也就越来越多, 接下来是我使用springboot整合mqtt的过程, 以及踩过的一些坑. mqtt服务器使用的是 EMQX, 官网 : 这里 搭建的时候如果你使用的是集群 记得开放以下端口: 好了, …

Springboot实现MQTT通信

目录 一、MQTT简介1、MQTT协议2、MQTT协议特点 二、MQTT服务器搭建三、使用Springboot整合MQTT协议1、在父工程下创建一个Springboot项目作为消息的提供者1.1 导入依赖包1.2 修改配置文件1.3 消息发布者客户端配置1.4 消息发布客户端回调1.5 创建控制器测试发布信息 2、在父工程…

一步一步来:MQTT服务器搭建、MQTT客户端使用

物联网应用如火如荼&#xff0c;本文就物联网应用中最受青睐的协议 MQTT相关测试工具的使用进行简单说明。 希望此文能给需要用到的朋友一些微薄的帮助…… 一、MQTT服务器&#xff08; emqx &#xff09;搭建 1. 下载服务器MQTT Broker 从 https://www.emqx.io/cn/mqtt/public…

【MQTT基础篇(三)】连接MQTT服务端

文章目录 连接MQTT服务端1 CONNECT – 连接服务端1.1 clientId – 客户端ID1.2 cleanSession – 清除会话1.3 keepAlive – 心跳时间间隔 2 CONNACK – 确认连接请求2.1 returnCode – 连接返回码2.2 sessionPresent – 当前会话 连接MQTT服务端 MQTT客户端之间要想实现通讯&am…

MQTT介绍与使用

目录 一、MQTT简介 二、特性 三、实现方式   四、MQTT的搭建&#xff08;ubuntu&#xff09; 五、MQTT权限配置 六、MQTT实现&#xff08;Java语言&#xff09; 正文 物联网是新一代信息技术的重要组成部分&#xff0c;也是“信息化”时代的重要发展阶段。其英文名称是&am…

什么是MQTT

1、MQTT来龙去脉 1.1 什么是MQTT MQTT(英文全称Message Queuing Telemetry Transport&#xff0c;消息队列遥测传输协议)是一种基于发布/订阅&#xff08;PUBLISH/SUBSCRIBE&#xff09;模式的轻量级的物联网通信协议。从这个定义中我们可以总结出四个关键词&#xff1a;消息队…

设备分配与spooling技术详解

5.4 设备分配&#xff08;重点&#xff01;&#xff01;&#xff01;&#xff09; 一、设备分配中的数据结构二、设备分配时应考虑的因素三、设备独立性四、设备独占的分配程序五、Spooling技术 一、设备分配中的数据结构 在进行设备分配时&#xff0c;通常都要借助一些表格…

SPOOLing和虚拟化

什么是虚拟化 虚拟化就是无中生有&#xff0c;就是暗度陈仓&#xff08;狗头保命&#xff09;。仔细观察整个计算机系统的设计&#xff0c;到处都体现着虚拟化的技术。当然虚拟化就是操作系统设计的要求之一。我们很好理解 CPU 的虚拟化技术&#xff0c;也就是通过进程调度实现…

操作系统 假脱机(Spooling)系统

介绍 通过多道程序技术可将一台物理CPU虚拟为多台逻辑CPU&#xff0c;从而允许多个用户共享一台主机。那么&#xff0c;假脱机技术&#xff0c;则可将一台物理I/O设备虚拟为多台逻辑I/O设备&#xff0c;这样也就允许多个用户共享一台物理I/O设备。 1. 假脱机技术 早期&#…

题目SPOOLing系统的设计与实现

最近刚刚做的一个课程设计&#xff0c;关于SPOOLing的。 一、算法或原理的实现思想 技术原理 SPOOLing技术可将一台物理I/O设备虚拟为多台逻辑I/O设备&#xff0c;同样允许多个用户共享一台物理I/O设备。SPOOLing技术把所有用户进程的输出都送入输出井&#xff0c;然后再由输出…

精确度,准确度,精密度关系

1.精确度&#xff0c;准确度&#xff0c;精密度的关系 三者得关系大体可以理解为&#xff0c;准确度精密度 精确度&#xff0c;准确度反应距离真值得偏差&#xff0c;精密度反应测量得稳定性&#xff0c;精确度反应二者之综合。 三者得主次关系&#xff1a;精密度>准确度 …

Mysql的浮点精确度

1.mysql的用于记录小数的类型有三个float ,decimal 和double他们之间的关系 先创建一个表test都用了float ,decimal 和double 插入一条数据查看发现没有发现精度丢失问题 再插入一条数据&#xff0c;发现精度损失&#xff1a; 查看三个类型的范围&#xff1a; 插入小数的位数多…

验证集精确度和损失同时上升

目录 1. 实验结果2. 分析 1. 实验结果 下图中val_acc&#xff0c;val_loss分别表示验证集精确度和损失&#xff1b;train_acc&#xff0c;train_loss分别表示训练集精确度和损失。验证集精确度一直上升&#xff0c;但是损失在第六个epoch后也开始上升&#xff0c;如何解释&…

JavaScript超大或超小数值精确度丢失解决方案

情景一 接口字段&#xff0c;Number类型数据失真&#xff0c;解决方法可直接让服务端把字段类型改成String类型即可。 情景二 某些特殊场景&#xff0c;需要保留小数点后9位(及其以上)&#xff0c;直接调用Number对象自带的toFixed()函数&#xff0c;会出现小数点后数据失真…

关于JavaScript精确度问题

一、js精确度的安全范围是 -2^53 至 2^53 一旦超过这个范围则无法精确表示 1.解决方法 使用第三方包 JSON-Bigint JSONbig.parse() //转换出来的是一个BigNubmer对象 若要使用则用toString()方法 JSONbig.stringify() 2.当axios获取响应数据时自动会将数据JSON.parse()解析为…

【机器学习】准确率、精确度、召回率和 F1 定义

一、说明 数据科学家选择目标变量后 - 例如他们希望预测电子表格中的“列”&#xff0c;并完成了转换数据和构建模型的先决条件&#xff0c;最后步骤之一是评估模型的性能。 二、混淆矩阵的模型 2.1 混淆矩阵 选择性能指标通常取决于要解决的业务问题。假设您的数据集中有 10…