tcp client

article/2025/10/14 5:14:36

Mina 自定义硬件通讯协议框架搭建(TCP Client)

2018.03.04 18:49:29字数 1057阅读 2323

Apache MINA 是一个能够帮助用户开发高性能和高伸缩性网络应用程序的框架。它通过Java nio技术基于TCP/IP和UDP/IP协议提供了抽象的、事件驱动的、异步的API。

使用背景

大三读完,出去实习。接触到的第一个框架。我本是一名JAVA黑 微笑, 奈何实习公司仅JAVA开发,好吧,路转粉。
由于是边学边做, 难免在学习过程中遇到了很多坑,不过最终还是解决了,这里就不一一叙述了。
本文将根据自己搭建的框架的过程写此文(参考了很多前辈的代码,总结出来)。

项目背景

项目是根据一套设备(其实是一种超大型LED屏幕)厂商提供的通讯协议去控制设备,该协议是硬件厂家自己定制。
设备数目: 450套

自定义词说明

名称说明
VMS指设备

通讯协议

名称说明
类型TCP/IP
客户端MINA
服务端VMS

数据包格式

包头、类型、数据长度、数据、校验码、包尾
名称偏移位置长度值范围
包头01固定字符‘*’
类型110-127
数据长度22数据字节数
数据4n协议类型决定
校验码n+42类型、数据长度、数据三部分的所有字节的CRC-16码
包尾n+61字符‘#’
转义问题: 其中字符'\'为转意符,所有除头尾外的字节,如果是'*','#','\'在通讯时换成"\*","\#","\\" 。 大小端问题:凡涉及多字节数据均为低字节在前

创建项目

开发环境: IntelliJ IDEA (学生可以免费申请收费版(hhh)),创建的是一个Maven的工程。

<!-- 日志工具-->
<dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.7</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version>
</dependency><!-- MINA  -->
<dependency><groupId>org.apache.mina</groupId><artifactId>mina-core</artifactId><version>2.0.7</version>
</dependency>

目录结构

|-- pom.xml
`-- src|-- com|   `-- taoroot|       `-- mina|           |-- protocol                                // 协议 目录|           |   |-- IMessageBody.java|           |   |-- MyBuffer.java                       // IoBuffer 封装|           |   |-- VMSMessage.java                     |           |   |-- VMSMessageFactory.java              // 解析工厂|           |   |-- VMSMessageHeader.java               // 表头部分内容|           |   |-- VMS_00.java                         // 心跳数据包|           |   |-- VMS_14.java                         // 巡检数据包|           |-- client                                  // Mina 目录|           |   |-- ClientSessionHandler.java|           |   |-- ConnectListener.java|           |   |-- MinaClient.java                     // 程序路口|           |   |-- ExceptionHandler.java               // 心跳包超时处理类|           |   |-- VMSKeepAliveFilter.java             // 心跳包拦截器|           |   |-- VMSKeepAliveMessageFactory.java     // 心跳包工厂|           |   |-- VMSMessageCodecFactory.java     |           |   |-- VMSMessageDecoder.java|           |   `-- VMSMessageEncoder.java|           `-- util|               |-- ByteUtil.java|               |-- ClassUtils.java|               |-- ConfigUtil.java|               |-- CrcCodeUtil.java|               |-- MinaUtil.java|               |-- RGBUtil.java|-- config.properties                                   // 默认配置`-- log4j.properties                                    // 日志配置

启动服务 MinaClient

vmsMap:

用来接收来自设备列表
key: 设备编号, value: 设备地址
编号是唯一的,后端下发指令时,用此编号区分设备

sessionMap

设备对应的session
key: 设备编号, value: IoSession
只有连接成功的设备才会存入,所以在下一次获取列表后,将重新尝试连接。

初始化Mina以后, 系统将定时从后端获取设备列表存入vmsMap中,与sessionMap中的编号比较。
有几次情况需要考虑:

情况1. vmsMap中有, sessionMap中没有: 尝试创建IoSession,创建成功则加入sessionMap。
情况2. vmsMap中有, sessionMap中有,但设备地址发生改变: 关闭现有session, 然后按情况1处理。
情况3. vmsMap中没有,sessionMap中有, 关闭现有session,从sessionMap移除。

流程图

getList.png

代码实现

    // Mina 初始化配置connector = new NioSocketConnector();connector.setConnectTimeoutMillis(MinaUtil.CONNECT_TIMEOUT);// ioBuffer 日志(实际生产环境不需要)connector.getFilterChain().addLast( "logger", new LoggingFilter() );// 解码过滤层 (数据包转对象)connector.getFilterChain().addLast("vms_coder", new ProtocolCodecFilter(new VMSMessageCodecFactory()));// 超时过滤层 (对TCP在线,心跳包超时的设备主动断开连接)KeepAliveFilter heartBeat = new VMSKeepAliveFilter(new VMSKeepAliveMessageFactory());// 设置心跳频率heartBeat.setRequestInterval((int) MinaUtil.HEART_BEAT_RATE);connector.getFilterChain().addLast("heartbeat", heartBeat);// 业务处理类connector.setHandler(new ClientSessionHandler());IoSession session;// MQ 初始化// Mina本身不知道设备的网络地址,是通过订阅形式,从后端获取过来TopicSender.createTopic(VMS_SCREEN_LIST_TOPIC);MQListener.init();for (; ; ) {// 获取列表 (0是设备列表信息)MinaUtil.getDeviceScreen("", 0);// 是否被锁if (!StaticUtil.isIsRefreshDeviceMap()) {// 关锁StaticUtil.setIsRefreshDeviceMap(true);// 遍历设备列表for (String devNo : deviceMap.keySet()) {// 新设备上线, 创建新连接if (!ioSessionMap.containsKey(devNo)) {newSocket(devNo, deviceMap.get(devNo));} else {// 查看设备对应的地址是否变化session = ioSessionMap.get(devNo);String currentAddress = "";  // 当前地址String newAddressPort = deviceMap.get(devNo);  // 新地址// 从session中获取出当前连接的地址currentAddress = getAddressPort(session);// 地址发生改变if (!currentAddress.equals(newAddressPort)) {session.close(true);    // 关闭目前连接的sessionnewSocket(devNo, newAddressPort);    // 用新地址创建连接}}}// 设备列表中已经删除了编号, session如果存在,也需要断开连接Iterator<Map.Entry<String, IoSession>> it = ioSessionMap.entrySet().iterator();while (it.hasNext()) {Map.Entry<String, IoSession> entry = it.next();String devNo = entry.getKey();session = entry.getValue();if (!deviceMap.containsKey(devNo)) {it.remove();// 从session中获取出当前连接的地址String currentAddress = getAddressPort(session);session.close(true);sendOnlineStatus(devNo, currentAddress, VMS_OFFLINE_STATUS);logger.info("设备: " + devNo + " 离线");}}// 开锁StaticUtil.setIsRefreshDeviceMap(false);}// 休眠一段时间在去获取设备列表try {Thread.sleep(REFRESH_DEVICE_LIST_TIME * 1000);}catch (Exception e) {e.toString();}}}

编解码工厂 VMSMessageCodecFactory

public class VMSMessageCodecFactory implements ProtocolCodecFactory {private final VMSMessageDecoder decoder;       // 解码器private final VMSMessageEncoder encoder;       // 编码器
}

解码器

粘包

两个数据包的部分数据相连接

解码流程图

// todo

解码器源代码

public class VMSMessageDecoder extends CumulativeProtocolDecoder {private static Logger logger = Logger.getLogger(VMSMessageDecoder.class);protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {if (in.remaining() < 1) {return false;}in.mark();byte[] data = new byte[in.remaining()];in.get(data);int pos = 0;in.reset();while (in.remaining() > 0) {in.mark();byte tag = in.get();//搜索包的开始位置if (tag == 0x2A && in.remaining() > 0) {tag = in.get();//寻找包的结束while (tag != 0x23) {if (in.remaining() <= 0) {in.reset(); //没有找到结束包,等待下一次包return false;}tag = in.get();}pos = in.position();int packetLength = pos - in.markValue();if (packetLength > 1) {byte[] tmp = new byte[packetLength];in.reset();in.get(tmp);//解析VMSMessage message = new VMSMessage();message.ReadFromBytes(tmp);out.write(message); //触发接收Message的事件}}}return false;}
}

编码器

编码器相对就简单很多。

编码器源代码

public class VMSMessageEncoder extends ProtocolEncoderAdapter {@Overridepublic void encode(IoSession ioSession, Object message, ProtocolEncoderOutput out) throws Exception {IoBuffer buf = IoBuffer.allocate(500).setAutoExpand(true);VMSMessage vmsMessage = (VMSMessage) message;buf.put(vmsMessage.WriteToBytes());buf.flip();out.write(buf);out.flush();buf.free();}
}

心跳包机制

vms 和 mina 之间需要不能有超过2分钟的空闲状态。 需要定期发送心跳包,否则vms将进入离线状态,停止播放。
发送过程如下: mina 进入空闲状态, 发送心跳包, vms回发心跳包, mina 接收到心跳包。如果未在规定的时间内,接收到心跳包,超过三次,将主动关闭session。
mina 其实自带了一套心跳包拦截器,(上文mina配置代码)。可以将心跳包在IoHandler前处理掉,就不用再业务层去关心了。

MyKeepAliveFilter

public class MyKeepAliveFilter extends KeepAliveFilter {private static final int TIMEOUT = CmdOptionHandler.getTimeout();public MyKeepAliveFilter(KeepAliveMessageFactory messageFactory) {// super(心跳包工厂, 两遍都是空闲状态, 超时处理类, 上发超时时间, 下发超时时间)super(messageFactory, IdleStatus.BOTH_IDLE, new MyKeepAliveRequestTimeoutHandler(), TIMEOUT, TIMEOUT);//此消息不会继续传递,不会被业务层看见this.setForwardEvent(false);}
}

VMSKeepAliveMessageFactory

主要有两个功能:
第一,判断是否是心跳包,
第二,生成一个心跳包数据。
KeepAliveMessageFactory 提供了四个接口,当初看了网上教程说什么半工,双工什么的,一头雾水,最后硬着头皮,源代码了。当时猜测是 sessionIdle 发送了心跳包, messageReceived 接收心跳包。

KeepAliveFilter 解读

// 空闲状态触发
public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception {if (status == interestedIdleStatus) {if (!session.containsAttribute(WAITING_FOR_RESPONSE)) {// 看来 getRequest 是用来获取一个发送心跳包数据接口Object pingMessage = messageFactory.getRequest(session);if (pingMessage != null) {nextFilter.filterWrite(session, new DefaultWriteRequest(pingMessage));if (getRequestTimeoutHandler() != KeepAliveRequestTimeoutHandler.DEAF_SPEAKER) {markStatus(session);if (interestedIdleStatus == IdleStatus.BOTH_IDLE) {session.setAttribute(IGNORE_READER_IDLE_ONCE);}} else {resetStatus(session);}}} else {handlePingTimeout(session);}} 
}
// 接收到数据触发
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {try {// 判断数据包是否是心跳包,if (messageFactory.isRequest(session, message)) {// 这里又获取了一个心跳包,所以说这里是用来判断,vms主动发送心跳包过来,然后mina回应一个心跳包// 在我们设备中,设备不主动发送心跳包,也不对设备主动发送心跳包做一个回应,所以,不需要这个逻辑// 所以让 isRequest 放回 false 就行, Object pongMessage = messageFactory.getResponse(session, message);// 保险起见,让 getResponse 也返回nullif (pongMessage != null) {nextFilter.filterWrite(session, new DefaultWriteRequest(pongMessage));}}// 这里也是判断心跳包,if (messageFactory.isResponse(session, message)) {// 里面是清空了 mina在sessionIdl下发送的心跳包标志位// 所以这心跳包 是用来是vms回发的心跳响应包。resetStatus(session);}} finally {if (!isKeepAliveMessage(session, message)) {nextFilter.messageReceived(session, message);}}
}

好了,到这里就知道怎么去写 VMSKeepAliveMessageFactory 了。
当初看教程的死活没看懂,还弄出了死循环,vms、mina一直在对送心跳包。有时候源代码才是最好的教程呀(hhh)。

心跳包计数器(业务逻辑扩展)

心跳包是在空闲状态下会以一定的频率发送,业务层需要有一个定时巡检的数据包,vms会返回自身状态信息,所以我就把这个单做了了一个计数器使用。mina每次发送一个心跳包,就自增一,达到触发值就发送状态包。
巡检间隔 = 心跳包发送间隔 * 触发值

VMSKeepAliveMessageFactory 源代码

public class VMSKeepAliveMessageFactory implements KeepAliveMessageFactory {private final static org.slf4j.Logger logger = LoggerFactory.getLogger(ClientSessionHandler.class);private VMSMessage vmsMessage;// 发送心跳包@Overridepublic Object getRequest(IoSession arg0) {heartCountAdd(arg0);vmsMessage = new VMSMessage();vmsMessage.setMessageContents(new VMS_00());return vmsMessage;}// 拦截心跳包@Overridepublic boolean isResponse(IoSession session, Object message) {return isHeartPage(message);}@Overridepublic Object getResponse(IoSession arg0, Object arg1) {return null;}@Overridepublic boolean isRequest(IoSession session, Object message) {return false;}//  心跳包 计数器private void heartCountAdd(IoSession session) {int heartCounter = (int) session.getAttribute(MinaUtil.VMS_HEARTBEAT_COUNT_ATTRIBUTE) + 1;session.setAttribute(MinaUtil.VMS_HEARTBEAT_COUNT_ATTRIBUTE, heartCounter);if(heartCounter > MinaUtil.VMS_HEARTBEAT_MAX) {session.setAttribute(MinaUtil.VMS_HEARTBEAT_COUNT_ATTRIBUTE, 0);heartCounterHandler(session);}}// 定时查询任务private void heartCounterHandler(IoSession session) {// 定期查询 设备状态VMSMessage message = new VMSMessage();message.setMessageContents(new VMS_0F());session.write(message);}// 判断是不是心跳包private boolean isHeartPage(Object message) {vmsMessage = (VMSMessage) message;if (vmsMessage.getMessageContents() instanceof VMS_00) {return true;}return false;}
}

http://chatgpt.dhexx.cn/article/7wUBJyoj.shtml

相关文章

traceroute的工作原理

MyySophia5个月前 traceroute的工作原理 是利用ICMP差错控制报文中的TTL超时会回向源点发送一个时间超时报文。例如A 主机 traceroute B主机&#xff0c;A会封装一些分组&#xff0c;这些分组很特殊&#xff0c;例如第一个分组的TTL设置为1 &#xff0c;第二个分组的TTL设置为…

TCPTRACE的使用说明

主要的配置项介绍 Listen on Port &#xff1a;本机监听的端口&#xff0c;后面请求服务时使用&#xff1b; Destination Server &#xff1a;目标服务器的地址 Destination Port &#xff1a;目标端口 完成后&#xff0c;在地址栏中输入 http://127.0.0.1:8080(或 http://loca…

tcptrace

http://download.csdn.net/tag/tcptrace TcpTrace 0.8.1.717 http://www.soft82.com/download/windows/tcptrace/ 用tcpTrace查看SOAP请求/应答消息 使用说明 12. 五月 2009 23:17 by 螵蛸in Server技术 // Tags: tcpTrace, SOAP // 评论 (0) 对于开发Web服务应用…

tracert/traceroute原理

一、路由追踪程序traceroute/tracert Traceroute是Linux和Mac OS等系统默认提供的路由追踪小程序&#xff0c;Tracert是Windows系统默认提供的路由追踪小程序。二者的功能相同&#xff0c;都能探测数据包从源地址到目的地址经过的路由器的IP地址。Traceroute/Tracert的实现都借…

tcpTrace的使用

tcpTrace是一款小巧的获取请求报文和响应报文的工具&#xff0c;使用非常简单。 比如我现在有一个服务地址是http://localhost:8080/springmvc/handle.do,那么我们在tcpTrace中的配置如下&#xff1a; listen on port&#xff1a;8081&#xff08;这个端口号可以自己设定&…

Tracetcp/Tcptrace的使用

Tracetcp是一个类似于Tracert的工具&#xff0c;可以直接在命令后加端口进行指定端口测试。 使用Tracetcp要求 1. 安装winpcap &#xff0c; 下载链接&#xff1a;https://www.winpcap.org/install/ 2.下载tracetcp软件&#xff0c;下载链接&#xff1a; https://github.com…

traceroute详解

traceroute详解 1.traceroute基本概念 traceroute (Windows系统下是tracert) 命令利用ICMP 协议定位您的计算机和目标计算机之间的所有路由器。TTL值可以反映数据包经过的路由器或网关的数量&#xff0c;通过操纵独立ICMP呼叫报文的TTL值和观察该报文被抛弃的返回信息&#x…

如何使用TCP Traceroute

与发送UDP或ICMP ECHO数据包的传统跟踪路由不同&#xff0c;TCP跟踪路由使用TCP数据包&#xff0c;因此可以绕过最常见的防火墙过滤器。 请遵循以下说明以运行TCP Traceroute&#xff1a; 对于Windows用户对于Mac用户对于Linux用户 对于Windows用户&#xff0c; Windows没有…

TCP/IP 网络:Traceroute程序

Traceroute是一个用来探索TCP/IP协议的工具&#xff0c;他通过ICMP协议可以让我们看到IP数据报从一台主机传送到另一台主机所经过的所有路由。 使用方法&#xff1a; traceroute [参数] [主机名] windows下命令为 tracert [] [] [-n]:显示的地址是用数字表示而不是符号[-v]…

hdfs创建文件报错 mkdir: Cannot create directory /Flink. Name node is in safe mode.

据资料是说hdfs刚刚启动&#xff0c;还在验证和适配&#xff0c;所以进入安全模式&#xff0c;等一会儿就好了&#xff0c;然后我等了几分钟并没有好 然后找到了解决安全模式的办法&#xff1a; 用户可以通过dfsadmin -safemode value 来操作安全模式&#xff0c;参数value…

network_tcp三次握手

TCP是TCP/IP的传输层控制协议&#xff0c;提供可靠的连接服务&#xff0c;采用三次握手确认建立一个连接: 首先需要了解几个名词&#xff1a;tcp标志位,有6种分别为&#xff1a;SYN(synchronous建立联机) 、ACK(acknowledgement 确认) 、PSH(push传送) 、FIN(finish结束)、 RS…

mysql1396错误

波尔&#xff0c;被控制的电脑通讯端口是多少&#xff1f; 1222nervSNIR&Dnetwork 1239nmsdNMSD 1243Sub-7木马 1245Vodoo 1248hermes 1269MavericksMatrix 1492FTP99CMP(BackOriffice.FTP) 1509StreamingServer 1524ingreslock后门 1313bmc_patroldb 1314pdps 1321pipPIP …

NetFlow

&#xfeff;&#xfeff; NetFlow是一种数据交换方式。Netflow提供网络流量的会话级视图&#xff0c;记录下每个TCP/IP事务的信息。也许它不能象tcpdump那样提供网络流量的完整记录&#xff0c;但是当汇集起来时&#xff0c;它更加易于管理和易读。Netflow由Cisco创造。 工作原…

failed to create network error response from daemon filed to setup ip tables问题

问题 今天在环境上搭建平台&#xff0c;执行docker-compose up -d 报错 Error response from daemon: Failed to Setup IP tables: Unable to enable SKIP DNAT rule: (iptables failed: iptables --wait -t nat -I DOCKER -i br-b649822bbcff -j RETURN: iptables: No chai…

Devtools 热部署

文章目录 前言使用步骤 1.引入库2.配置总结 前言 在实际开发过程中&#xff0c;每次修改代码就得将项目重启&#xff0c;重新部署&#xff0c;对于一些大型应用来说&#xff0c;重启时间需要花费大量的时间成本。对于一个后端开发者来说&#xff0c;重启过程确实很难受啊 一、使…

内网渗透的那些net命令|Net config|Net

Net命令 Net命令是一个命令行命令,Net命令有很多函数用于实用和核算计算机之间的NetBIOS连接,可以查看我们的管理网络环境,服务,用户,登陆等信息内容 Net使用方法 显示当前域的计算机列表net view 查看指定计算机的共享资源列表net view \test 查看共享的资源net share 查看…

failed to load response dataRequest content was evicted from inspector cache

在项目中&#xff0c;我用谷歌浏览器查看后台返回的json数据&#xff0c;但是发现前端页面已经接收成功&#xff0c;并且渲染了对应json数据了&#xff0c;但是network里面的response却报错&#xff1a; 调整对应json数据后发现&#xff0c;当后台返回前端的数据超过了一定大…

火狐浏览器提示响应已被截断(有效解决)

产生问题如下&#xff1a;JSON传递数据超过1M 解决方案&#xff1a; 第一步&#xff1a;地址栏输入about:config 第二步&#xff1a;devtools.netmonitor.responseBodyLimit 改为0&#xff0c;相当于禁用大小限制&#xff0c;保存之后即可。

failed to load response data:Request content was evicted from inspector cache

在项目中&#xff0c;我用谷歌浏览器查看后台返回的json数据&#xff0c;但是发现前端页面已经接收成功&#xff0c;并且渲染了对应json数据了&#xff0c;但是network里面的response却报错&#xff1a; 调整对应json数据后发现&#xff0c;当后台返回前端的数据超过了一定大…

必须做作业三:Network-Monitor观察者模式解析

一、总述 观察者模式,由观察者和被观察对象组成,java已经提供了相关类供我们开发者调用! 当数据变化时,Observable会通知集合里的所有观察者对象!具体在数据变化后,app调用Observable的notifyObservers方法,那么 集合里的所有Observer的update()会被执行! 设计其实很简单&#…