netty实现多协议,多编解码器

article/2025/11/11 2:38:58

实现多协议,多编解码器详解

  • 前言
  • netty中handler的执行顺序和条件(重要)
    • 执行顺序
      • 入栈handler介绍
      • 出栈handler介绍
      • 入栈handler处理器顺序
      • 出栈handler处理器顺序
    • 执行条件
  • 编码演示
    • 客户端
    • 服务端
  • 编解码器异常继续传递消息(扩展)

前言

先讲一下场景,我现在有一个需求,需要传递对象和字符串,其中对象要用protobuf来序列化进行通信,所以,这就产生了两个协议,一个字符串,一个protobuf,那么想要发送和接收这些消息,就需要具备字符串的编解码器和protobuf的编解码器。
当然,你可以说把对象序列化成json字符串传递,实现统一,这样也可以,但是不在本文讨论范围,其次,json传递效率是比较低的,还有就是全部转成json之后,在接收方进行解码判断的时候就比较麻烦了。
言归正传,接下来我们就开始正文讲解,为了防止粘包和拆包,我们需要再加一个分隔符的编解码器,这样下来,就会出现三个编解码器了,分隔符暂定用@。

netty中handler的执行顺序和条件(重要)

在正文开始之前,咱们先弄清楚handler执行顺序和条件,否则,接下来的内容,看似明白,实则还是无法融汇变通,举一反三,博主就是因为一开始没有先去理解这个概念,导致做了无数次试验,浪费了极多的时间。

执行顺序

handler执行顺序又分为入栈出栈顺序,分别介绍一下,先提供一下接下来整个实践项目要用到的handler,方便下边讲解。
在这里插入图片描述

入栈handler介绍

在这里插入图片描述
凡是实现这个接口这就是入栈handler,比如我们常用的SimpleChannelInboundHandler、ChannelInboundHandlerAdapter等都是实现了ChannelInboundHandler的,包括我们常用的解码器,比如StringDecoder、DelimiterBasedFrameDecoder等也都是实现了此接口。

出栈handler介绍

在这里插入图片描述
我们常用的编码器Encoder,比如StringEncoder等都是实现了此接口ChannelOutboundHandler的。

总结就是,入栈handler就是专门用来拦截处理接收进来的消息,出栈handler就是专门用来拦截处理要发送出去的消息,例如我们常用的 ctx.writeAndFlush(“xxx”)就是要发送的消息,而各种编码器就是出栈handler。

入栈handler处理器顺序

为了方便讲解,我会将上述的一堆handler以出栈和入栈的类型拆分出来,进行图解,因为消息入栈是绝对不会经过出栈的handler的,反之亦然。
在这里插入图片描述

从上图可以看出来,入栈handler处理器顺序是从上到下,依次执行,至于为什么有两个图,而且有的handler被跳过去了,先不用管,这属于handler的执行条件内容,后边会讲到。

出栈handler处理器顺序

在这里插入图片描述

从上边两个图可以看出,出栈顺序是按照从下往上,依次进行的,至于为什么有两个图,这里就涉及到另一个知识点了:

1、ctx.writeAndFlush只会从当前的handler位置开始,往前找ChannelOutboundHandler执行;
2、ctx.pipeline().writeAndFlushctx.channel().writeAndFlush会从最开始的的位置,往前找ChannelOutboundHandler执行。

所以,用到本文的项目中,就会是这样的效果:
在这里插入图片描述

看完这个,可能大家就有疑问了,为什么消息走到ProtostuffEncoder就直接出去了,没有走上边的两个编码器呢,这就涉及到入栈和出栈执行条件的问题了。

执行条件

说明上边的问题之前,我们先来看一下handler发送的消息类型和各个编码器的具体实现。

handler发送的消息类型:
在这里插入图片描述

各编码器的实现:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述注:String实现了CharSequence接口,所以泛型也属于String。
从上边四个图可以看到,最初发送的消息是ComputeResult对象,那么显然,只会有泛型为ComputeResult的ProtostuffEncoder编码器接收到了消息,其它两个因为泛型不匹配,所以无法接收消息,接着ProtostuffEncoder编码器进行一顿操作之后将消息以ByteBuf的方式写出去,但是其它两个编码器还是因为泛型不匹配无法接收处理,所以最终出栈。

总结:无论是出栈handler,还是入栈handler,具体顺序怎么执行,是需要两个因素来决定,一个是当前handler所在的位置,二个就是当前handler的泛型,只有这两个条件同时满足,才会拦截到消息。

另外,格外提醒:一个handler接收到消息的泛型,一定是上个handler处理之后发送出来的泛型,而不是最初的handler发送的类型,就比如上边的案例,开始发送的是ComputeResult对象,经过ProtostuffEncoder编码器之后就变成了ByteBuf,如果后边有一个支持ByteBuf泛型的handler,那么就会拦截到这个消息了。

如果不标明泛型,那么泛型默认为object,比如这个解码器:
在这里插入图片描述

所以一般为了逻辑清晰,提高效率,防止被不需要的handler拦截,最好提前规定好每个handler的泛型,但是也不是一定,根据实际情况而定吧。

编码演示

protostuff自定义编码器参考上一篇文章:netty之Protostuff序列化协议。
由于经过
另外,对于protostuff解码器,请参考最下方扩展部分,否则字符串消息无法传递到下一个string解码器。

客户端

/*** @author: zhouwenjie* @description: 客户端* @create: 2020-04-03 17:14**/
@Component
@Slf4j
public class NettyClient {@Value("${monitor.server.host}")private String host;@Value("${monitor.server.port}")private int port;@Value("${monitor.delimiter}")private String delimiter;@Autowiredprivate NettyClientHandler nettyClientHandler;private NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();private Bootstrap bootstrap;@PostConstructpublic void run() throws UnknownHostException {bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).remoteAddress(host, port).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) {//客户端初始化socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024*8, Unpooled.wrappedBuffer(delimiter.getBytes())));socketChannel.pipeline().addLast(new ProtostuffDecoder(ComputeResult.class));socketChannel.pipeline().addLast(new ProtostuffEncoder(delimiter));socketChannel.pipeline().addLast(nettyClientHandler);}});String hostAddress = InetAddress.getLocalHost().getHostAddress();// 指定本机ip端口,用来给服务端区分,指定端口,重启客户端会等两分钟才能连接上服务端bootstrap.localAddress(hostAddress,0);//连接netty服务器reconnect();}/*** 功能描述: 断线重连,客户端有断线重连机制,就更不能使用异步阻塞了* @param* @return void* @author zhouwenjie* @date 2021/3/19 14:53*/public void reconnect() {ChannelFuture channelFuture = bootstrap.connect();//使用最新的ChannelFuture -> 开启最新的监听器channelFuture.addListener((ChannelFutureListener) future -> {if (future.cause() != null) {log.error("连接失败。。。");future.channel().eventLoop().schedule(() -> reconnect(), 3, TimeUnit.SECONDS);} else {log.info("客户端连接成功。。。");}});}/*** 关闭 client*/@PreDestroypublic void shutdown() {// 优雅关闭 EventLoopGroup 对象eventLoopGroup.shutdownGracefully();log.info("[*Netty客户端关闭]");}
}

服务端

NettyServerChannelInitializer

/*** @author: zhouwenjie* @description: 配置管道  服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器* @create: 2020-04-03 14:14**/
@Component
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {@Value("${monitor.delimiter}")private String delimiter;@Autowiredprivate NettyServerHandler nettyServerHandler;@Autowiredprivate HeartBeatServerHandler heartBeatServerHandler;@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();//解码器,拦截消息去掉分隔符pipeline.addLast(new DelimiterBasedFrameDecoder(1024*8, Unpooled.wrappedBuffer(delimiter.getBytes())));//解码器,反序列化Protostuff消息为实际对象pipeline.addLast(new ProtostuffDecoder(ComputeResult.class));//解码器,字符串解码pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));//编码器,字符串编码pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));//编码器,在原消息后添加分隔符pipeline.addLast(new DelimiterBasedFrameEncoder(delimiter));//心跳处理器,处理心跳字符串消息pipeline.addLast(heartBeatServerHandler);//编码器,将对象进行Protostuff序列化pipeline.addLast(new ProtostuffEncoder(delimiter));//核心处理器,处理连接和消息的业务hanlderpipeline.addLast(nettyServerHandler);}

DelimiterBasedFrameEncoder

public class DelimiterBasedFrameEncoder extends MessageToMessageEncoder<String> {private String delimiter;public DelimiterBasedFrameEncoder(String delimiter) {this.delimiter = delimiter;}@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) {// 在响应的数据后面添加分隔符out.add(msg + delimiter);}
}

NettyServer

/*** @author: zhouwenjie* @description: netty启动配置类* @create: 2020-04-03 11:43**/
@Slf4j
@Component
public class NettyServer {@Autowiredprivate NettyServerChannelInitializer nettyServerChannelInitializer;private NioEventLoopGroup bossGroup;private NioEventLoopGroup workerGroup;@PostConstructpublic void start() {//创建接收请求和处理请求的实例(默认线程数为 CPU 核心数乘以2也可自定义)bossGroup = new NioEventLoopGroup(1);workerGroup = new NioEventLoopGroup(2);try {//创建服务端启动辅助类(boostrap 用来为 Netty 程序的启动组装配置一些必须要组件,例如上面的创建的两个线程组)ServerBootstrap socketBs = new ServerBootstrap();//channel 方法用于指定服务器端监听套接字通道//socket配置socketBs.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)//ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小.option(ChannelOption.SO_BACKLOG, 1000).childHandler(nettyServerChannelInitializer);//默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。//.childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = socketBs.bind(8688).sync();future.addListener(future1 -> log.info("Netty服务端启动成功"));} catch (InterruptedException e) {e.printStackTrace();}}@PreDestroypublic void shutdown() {// 优雅关闭两个 EventLoopGroup 对象bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();log.info("[*Netty服务端关闭成功]");}
}

HeartBeatServerHandler

/*** @author: zhouwenjie* @description: 心跳检测处理器* @create: 2022-03-25 16:12**/
@Slf4j
@Component
@ChannelHandler.Sharable
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {@Overridepublic void channelRead0(ChannelHandlerContext ctx, String msg) {// 接收到心跳请求,打印心跳消息,否则进入下一处理流程try {if ("ping".equals(msg)) {ctx.writeAndFlush("pong");log.info("[server] Receive client heart beat message : ----> {}", msg);}} catch (Exception e) {e.printStackTrace();}}/*** 功能描述: 心跳检测** @param ctx 这里的作用主要是解决断网,弱网的情况发生* @param evt* @return void* @author zhouwenjie* @date 2020/4/3 17:02*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {String socketString = ctx.channel().remoteAddress().toString();if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {log.info("Client RSU: " + socketString + " READER_IDLE 读超时");ctx.disconnect();}}}/*** 在处理过程中引发异常时被调用** @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("[server] heart response error: {}", cause.getMessage());ctx.fireExceptionCaught(cause);}
}

NettyServerHandler

/*** @author: zhouwenjie* @description: 服务端业务处理类* @create: 2020-04-03 14:13**/
@Slf4j
@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<ComputeResult> {public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 连接成功*/@Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println("客户端连接");clients.add(ctx.channel());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.error("[*The netty server suspends service...]");super.channelInactive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);log.error("[* Netty connection exception]:{}", cause.toString());cause.printStackTrace();}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ComputeResult computeResult) throws Exception {System.out.println(computeResult);ctx.writeAndFlush(computeResult);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {String socketString = ctx.channel().remoteAddress().toString();if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {System.out.println("Client: " + socketString + " READER_IDLE 读超时");ctx.disconnect();}}}
}

好啦,这个明白了之后,根据自己的业务扩展起来就简单多了吧,只要把握好顺序和泛型,再复杂的编解码也能轻松搞定啦。

编解码器异常继续传递消息(扩展)

这里主要介绍,怎么在一个handler(编码或者解码)失败后,将消息传递到下一个编解码器。

/*** @author: zhouwenjie* @description:* @create: 2022-07-12 11:17**/
public class ProtostuffDecoder<T> extends ByteToMessageDecoder {private Class<T> clazz;public ProtostuffDecoder(Class<T> clazz) {this.clazz = clazz;}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {try {byte[] body = new byte[in.readableBytes()];  //传输正常in.readBytes(body);out.add(ProtostuffUtils.deserialize(body, clazz));} catch (Exception e) {e.printStackTrace();}}
}

如上代码,如果在 out.add(ProtostuffUtils.deserialize(body, clazz));代码出错,怎么才能让消息继续传递给下一个解码器呢?
加下边的代码即可:

/*** @author: zhouwenjie* @description:* @create: 2022-07-12 11:17**/
public class ProtostuffDecoder<T> extends MessageToMessageDecoder<ByteBuf>{private Class<T> clazz;public ProtostuffDecoder(Class<T> clazz) {this.clazz = clazz;}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {try {byte[] body = new byte[in.readableBytes()];  //传输正常in.readBytes(body);out.add(ProtostuffUtils.deserialize(body, clazz));} catch (Exception e) {// 重置读取字节索引,因为上边已经读了(readBytes),不加这个会导致数据为空in.resetReaderIndex();// 这里是复制流,复制一份,防止skipBytes跳过,导致传递的消息变成空;//同时还解决引用计数器为0的异常:refCnt: 0, decrement: 1。ByteBuf buffer = in.retainedDuplicate();//解决 decode() did not read anything but decoded a message的异常//原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。in.skipBytes(in.readableBytes());out.add(buffer);}}
}

http://chatgpt.dhexx.cn/article/kYnBbMwX.shtml

相关文章

什么是802.1q协议

欢迎来到东用知识小课堂&#xff01; 1.VLAN VLAN全称&#xff1a;Virtual Local Area Network&#xff08;虚拟局域网&#xff09;&#xff0c;是将一个物理的LAN在逻辑上划分成多个广播域的通信技术。在数据帧的SMAC字段后添加VLAN标签字段&#xff0c;基于接口、IP、MAC或协…

qi无线充电协议_无线充电Qi专利池正式上线!

展位合作&#xff0c;请联系&#xff1a;infochongdiantou.com 此前&#xff0c;充电头网报道了Philips飞利浦将征收无线充专利费的事件&#xff0c;引发行业关于无线充专利费收取的广泛讨论。1月14日&#xff0c;关于大家关心的无线充专利授权&#xff0c;业界有了最新进展&am…

MQTT - 消息队列遥测传输协议

MQTT&#xff08;Message Queuing Telemetry Transport&#xff0c;消息队列遥测传输协议&#xff09;&#xff0c;是一种基于发布/订阅&#xff08;publish/subscribe&#xff09;模式的"轻量级"通讯协议&#xff0c;该协议构建于TCP/IP协议上&#xff0c;由IBM在19…

无线充qi协议c语言详解,无线充电Qi协议正向通信FSK的解调设计

白光磊 池卿华 王兆俊 江昊 史佳雯 周天 摘 要: 无线充电Qi协议提出发射器和接收器通过频率调制(FSK)方式进行正向通信,进而建立完整的通信状态控制。接收器可采用测宽法进行频率解调,然而由于电磁耦合变化、负载变化、载波占空比变化、测量量化等引起的误差,该方法无法满…

无线充qi协议c语言详解,QI无线充通信协议数据包格式解析

QI通信数据格式编码: 协议规定时钟信号的频率应该是Fclk = 2(4%)KHZ,所以每一位的传输时间约500us,如图所示 数据 0:500us的高电平,或者500us的低电平 数据 1:250us高电平+250us低电平,或者250us低电平+250us高电平 电源接收端(移动设备端)采用 11 位异步串行格式传输数…

无线充电协议Qi 转 UART

前言 最近有个网友联系我&#xff0c;咨询做无线充电的方案&#xff1b; 于是开始评估一下这个方案&#xff1a; 1&#xff1a;无线充电分为供电端无线发射&#xff08;通过MOS开关斩波&#xff0c;使线圈的电流变化&#xff0c;这样线圈对外才会很强的电磁辐射&#xff09;受…

无线充QI协议之TX与RX通讯方式篇

谈到无线充通讯&#xff0c;首先得了解下无线充电的工作原理&#xff0c;在描述中&#xff0c;提供功率的一端称为发射端&#xff0c;接收功率的一端称为接收端&#xff1b; 发射端直流电经过全桥转换为交流电&#xff0c;交流电通过线圈产生磁场&#xff08;电生磁&#xff0…

无线充电原理与QI协议详解

一 、无线充电基本原理 无线充电的基本原理就是我们平时常用的开关电源原理&#xff0c;区别在于没有磁介质耦合&#xff0c;那么我们需要利用磁共振的方式提高耦合效率&#xff0c;具体方法是在发送端和接收端线圈串并联电容&#xff0c;是发送线圈处理谐振状态&#xff0c;接…

Android 项目必备(二十六)-->获取手机中所有 APP

效果图 代码 添加依赖 implementation com.github.CymChad:BaseRecyclerViewAdapterHelper:2.9.30 implementation androidx.recyclerview:recyclerview:1.1.0AppInfo.java import android.graphics.drawable.Drawable;/*** created on 2020/8/3 20:22** author Scarf Gong*…

基于uni-app多平台管理系统模板uniapp-admin

uniapp-admin 2.0.0 重磅发布&#xff01; 更加完善的开发指南主题定制&#xff1a;支持颜色主题和深色模式&#xff0c;页面更加美观国际化/多语言&#xff1a;应用内容和pages.json国际化&#xff0c;支持N种语言引入iconfont&#xff1a;海量字体图标支持引入rap2接口管理平…

AppAnnie——AppStore统计工具

前言 随着iOS开发的流行&#xff0c;针对iOS开发涉及的方方面面&#xff0c;早有一些公司提供了专门的解决方案或工具。这些解决方案或工具包括&#xff1a;用户行为统计工具&#xff08;友盟&#xff0c;Flurry&#xff0c;Google Analytics等), App Store销售分析工具&#x…

Android 启动 应用程序详情AppInfo(AppDetail) -源码分析

在Launcher &#xff08;桌面&#xff09;上&#xff0c;长按应用图标然后点击 右上角的 应用详情 按钮&#xff0c; 将会进入 该 应用的详情 界面。 这个过程将会涉及 Client (Launcher App) -> App API(LauncherApps ) -> Framework API(LauncherAppsService) 下面将以…

AppInfoPro 获取手机应用信息

AppInfoKtx Github Android - 获取手机安装的应用信息&#xff08;用户应用、系统应用&#xff09;、手机信息、屏幕信息&#xff0c;并且支持扫描本地符合指定要求&#xff08;后缀&#xff09;的应用&#xff0c;并且获取该apk信息&#xff0c;支持导出应用信息&#xff08;…

关于Vista的AppInfo服务被禁的问题

昨天参考了这个文章对vista服务进行了大张旗鼓的阉割,冒险地把大部分服务都给卡擦了(不想转贴了,这个文章本身也是转贴的) http://blog.tom.com/pslwap/article/1411.html 然后就出现了郁闷的事情: 重新启动后任何需要提升权限来操作的程序都无法启动,提示"无法启动服务…

Android8.1根据app名字调用显示app的属性页(App info)

https://actionwind.wordpress.com/2022/04/14/android8-1%e6%a0%b9%e6%8d%aeapp%e5%90%8d%e5%ad%97%e8%b0%83%e7%94%a8%e6%98%be%e7%a4%baapp%e7%9a%84%e5%b1%9e%e6%80%a7%e9%a1%b5%ef%bc%88app-info%ef%bc%89/ 如果要让手机显示出app的属性页&#xff1a; 代码如下&#xff1…

vue项目打包部署注意点 + 宝塔面板几步部署项目

1.vue项目打包 1.1 终端运行打包命令 在编辑器的终端运行vue项目打包命令 yarn run build打包成功如下&#xff1a; 这时我们可以看到项目目录多出来一个dist文件夹&#xff0c;记住它&#xff0c;后面部署就靠它了。 1.2 修改配置 就我个人部署经历(宝塔面板快速部署)来…

vue项目打包部署到tomcat服务器

总结&#xff1a; 修改vue项目相关配置&#xff0c;cmd进入到vue项目文件夹中&#xff0c;执行npm run bulid命令&#xff0c;将生成的dist文件夹下的内容&#xff0c;存放到Tomcat中的webapps新建文件夹下&#xff0c;运行Tomcat服务器&#xff0c;通过 IP地址:端口/新建文件…

vue项目打包部署-手把手教程

vue项目打包部署 1.购买服务器 可选阿里云/腾讯云/华为云 等等… 购买时选择镜像,我们这里以CentOS为例 2.配置服务器 2.1 安装FinalShell ​ 需要本地使用一些软件来操作服务器,例如:FinalShell / Xshell … ​ 我这里使用的是FinalShell,安装好以后,打开软件与建立链接…

Java和Vue项目打包并进行服务器部署

两周前我刚刚入职实习的时候&#xff0c;后端的几个同事看到我需要学习如何把项目进行部署&#xff0c;都围过来教我怎么部署&#xff0c;我感觉学习到了很多&#xff0c;因此&#xff0c;记录一下学习的笔记。 当然了&#xff0c;这些部署是建立在已经配置好tomcat&#xff08…

手把手教你如何把vue项目打包后部署到服务器(小白教程)

一.需要用到的工具 vscode 下载链接&#xff1a;Visual Studio Code - Code Editing. Redefined FinalShell 下载链接&#xff1a;FinalShell官网 二.打包步骤 1.vscode打开你的vue项目-- >点终端 -- >输入npm run build 按回车进行打包&#xff1b; 2.打包成功 , 生…