实现多协议,多编解码器详解
- 前言
- netty中handler的执行顺序和条件(重要)
- 执行顺序
- 入栈handler介绍
- 出栈handler介绍
- 入栈handler处理器顺序
- 出栈handler处理器顺序
- 执行条件
- 编码演示
- 客户端
- 服务端
- 编解码器异常继续传递消息(扩展)
前言
先讲一下场景,我现在有一个需求,需要传递对象和字符串,其中对象要用protobuf来序列化进行通信,所以,这就产生了两个协议,一个字符串,一个protobuf,那么想要发送和接收这些消息,就需要具备字符串的编解码器和protobuf的编解码器。
当然,你可以说把对象序列化成json字符串传递,实现统一,这样也可以,但是不在本文讨论范围,其次,json传递效率是比较低的,还有就是全部转成json之后,在接收方进行解码判断的时候就比较麻烦了。
言归正传,接下来我们就开始正文讲解,为了防止粘包和拆包,我们需要再加一个分隔符的编解码器,这样下来,就会出现三个编解码器了,分隔符暂定用@。
netty中handler的执行顺序和条件(重要)
在正文开始之前,咱们先弄清楚handler执行顺序和条件,否则,接下来的内容,看似明白,实则还是无法融汇变通,举一反三,博主就是因为一开始没有先去理解这个概念,导致做了无数次试验,浪费了极多的时间。
执行顺序
handler执行顺序又分为入栈和出栈顺序,分别介绍一下,先提供一下接下来整个实践项目要用到的handler,方便下边讲解。

入栈handler介绍

凡是实现这个接口这就是入栈handler,比如我们常用的SimpleChannelInboundHandler、ChannelInboundHandlerAdapter等都是实现了ChannelInboundHandler的,包括我们常用的解码器,比如StringDecoder、DelimiterBasedFrameDecoder等也都是实现了此接口。
出栈handler介绍

我们常用的编码器Encoder,比如StringEncoder等都是实现了此接口ChannelOutboundHandler的。
总结就是,入栈handler就是专门用来拦截处理接收进来的消息,出栈handler就是专门用来拦截处理要发送出去的消息,例如我们常用的 ctx.writeAndFlush(“xxx”)就是要发送的消息,而各种编码器就是出栈handler。
入栈handler处理器顺序
为了方便讲解,我会将上述的一堆handler以出栈和入栈的类型拆分出来,进行图解,因为消息入栈是绝对不会经过出栈的handler的,反之亦然。

从上图可以看出来,入栈handler处理器顺序是从上到下,依次执行,至于为什么有两个图,而且有的handler被跳过去了,先不用管,这属于handler的执行条件内容,后边会讲到。
出栈handler处理器顺序

从上边两个图可以看出,出栈顺序是按照从下往上,依次进行的,至于为什么有两个图,这里就涉及到另一个知识点了:
1、
ctx.writeAndFlush只会从当前的handler位置开始,往前找ChannelOutboundHandler执行;
2、ctx.pipeline().writeAndFlush与ctx.channel().writeAndFlush会从最开始的的位置,往前找ChannelOutboundHandler执行。
所以,用到本文的项目中,就会是这样的效果:

看完这个,可能大家就有疑问了,为什么消息走到ProtostuffEncoder就直接出去了,没有走上边的两个编码器呢,这就涉及到入栈和出栈执行条件的问题了。
执行条件
说明上边的问题之前,我们先来看一下handler发送的消息类型和各个编码器的具体实现。
handler发送的消息类型:

各编码器的实现:


注:String实现了CharSequence接口,所以泛型也属于String。
从上边四个图可以看到,最初发送的消息是ComputeResult对象,那么显然,只会有泛型为ComputeResult的ProtostuffEncoder编码器接收到了消息,其它两个因为泛型不匹配,所以无法接收消息,接着ProtostuffEncoder编码器进行一顿操作之后将消息以ByteBuf的方式写出去,但是其它两个编码器还是因为泛型不匹配无法接收处理,所以最终出栈。
总结:无论是出栈handler,还是入栈handler,具体顺序怎么执行,是需要两个因素来决定,一个是当前handler所在的位置,二个就是当前handler的泛型,只有这两个条件同时满足,才会拦截到消息。
另外,格外提醒:一个handler接收到消息的泛型,一定是上个handler处理之后发送出来的泛型,而不是最初的handler发送的类型,就比如上边的案例,开始发送的是ComputeResult对象,经过ProtostuffEncoder编码器之后就变成了ByteBuf,如果后边有一个支持ByteBuf泛型的handler,那么就会拦截到这个消息了。
如果不标明泛型,那么泛型默认为object,比如这个解码器:

所以一般为了逻辑清晰,提高效率,防止被不需要的handler拦截,最好提前规定好每个handler的泛型,但是也不是一定,根据实际情况而定吧。
编码演示
protostuff自定义编码器参考上一篇文章:netty之Protostuff序列化协议。
由于经过
另外,对于protostuff解码器,请参考最下方扩展部分,否则字符串消息无法传递到下一个string解码器。
客户端
/*** @author: zhouwenjie* @description: 客户端* @create: 2020-04-03 17:14**/
@Component
@Slf4j
public class NettyClient {@Value("${monitor.server.host}")private String host;@Value("${monitor.server.port}")private int port;@Value("${monitor.delimiter}")private String delimiter;@Autowiredprivate NettyClientHandler nettyClientHandler;private NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();private Bootstrap bootstrap;@PostConstructpublic void run() throws UnknownHostException {bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).remoteAddress(host, port).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) {//客户端初始化socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024*8, Unpooled.wrappedBuffer(delimiter.getBytes())));socketChannel.pipeline().addLast(new ProtostuffDecoder(ComputeResult.class));socketChannel.pipeline().addLast(new ProtostuffEncoder(delimiter));socketChannel.pipeline().addLast(nettyClientHandler);}});String hostAddress = InetAddress.getLocalHost().getHostAddress();// 指定本机ip端口,用来给服务端区分,指定端口,重启客户端会等两分钟才能连接上服务端bootstrap.localAddress(hostAddress,0);//连接netty服务器reconnect();}/*** 功能描述: 断线重连,客户端有断线重连机制,就更不能使用异步阻塞了* @param* @return void* @author zhouwenjie* @date 2021/3/19 14:53*/public void reconnect() {ChannelFuture channelFuture = bootstrap.connect();//使用最新的ChannelFuture -> 开启最新的监听器channelFuture.addListener((ChannelFutureListener) future -> {if (future.cause() != null) {log.error("连接失败。。。");future.channel().eventLoop().schedule(() -> reconnect(), 3, TimeUnit.SECONDS);} else {log.info("客户端连接成功。。。");}});}/*** 关闭 client*/@PreDestroypublic void shutdown() {// 优雅关闭 EventLoopGroup 对象eventLoopGroup.shutdownGracefully();log.info("[*Netty客户端关闭]");}
}
服务端
NettyServerChannelInitializer
/*** @author: zhouwenjie* @description: 配置管道 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器* @create: 2020-04-03 14:14**/
@Component
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {@Value("${monitor.delimiter}")private String delimiter;@Autowiredprivate NettyServerHandler nettyServerHandler;@Autowiredprivate HeartBeatServerHandler heartBeatServerHandler;@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();//解码器,拦截消息去掉分隔符pipeline.addLast(new DelimiterBasedFrameDecoder(1024*8, Unpooled.wrappedBuffer(delimiter.getBytes())));//解码器,反序列化Protostuff消息为实际对象pipeline.addLast(new ProtostuffDecoder(ComputeResult.class));//解码器,字符串解码pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));//编码器,字符串编码pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));//编码器,在原消息后添加分隔符pipeline.addLast(new DelimiterBasedFrameEncoder(delimiter));//心跳处理器,处理心跳字符串消息pipeline.addLast(heartBeatServerHandler);//编码器,将对象进行Protostuff序列化pipeline.addLast(new ProtostuffEncoder(delimiter));//核心处理器,处理连接和消息的业务hanlderpipeline.addLast(nettyServerHandler);}
DelimiterBasedFrameEncoder
public class DelimiterBasedFrameEncoder extends MessageToMessageEncoder<String> {private String delimiter;public DelimiterBasedFrameEncoder(String delimiter) {this.delimiter = delimiter;}@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) {// 在响应的数据后面添加分隔符out.add(msg + delimiter);}
}
NettyServer
/*** @author: zhouwenjie* @description: netty启动配置类* @create: 2020-04-03 11:43**/
@Slf4j
@Component
public class NettyServer {@Autowiredprivate NettyServerChannelInitializer nettyServerChannelInitializer;private NioEventLoopGroup bossGroup;private NioEventLoopGroup workerGroup;@PostConstructpublic void start() {//创建接收请求和处理请求的实例(默认线程数为 CPU 核心数乘以2也可自定义)bossGroup = new NioEventLoopGroup(1);workerGroup = new NioEventLoopGroup(2);try {//创建服务端启动辅助类(boostrap 用来为 Netty 程序的启动组装配置一些必须要组件,例如上面的创建的两个线程组)ServerBootstrap socketBs = new ServerBootstrap();//channel 方法用于指定服务器端监听套接字通道//socket配置socketBs.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)//ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小.option(ChannelOption.SO_BACKLOG, 1000).childHandler(nettyServerChannelInitializer);//默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。//.childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = socketBs.bind(8688).sync();future.addListener(future1 -> log.info("Netty服务端启动成功"));} catch (InterruptedException e) {e.printStackTrace();}}@PreDestroypublic void shutdown() {// 优雅关闭两个 EventLoopGroup 对象bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();log.info("[*Netty服务端关闭成功]");}
}
HeartBeatServerHandler
/*** @author: zhouwenjie* @description: 心跳检测处理器* @create: 2022-03-25 16:12**/
@Slf4j
@Component
@ChannelHandler.Sharable
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {@Overridepublic void channelRead0(ChannelHandlerContext ctx, String msg) {// 接收到心跳请求,打印心跳消息,否则进入下一处理流程try {if ("ping".equals(msg)) {ctx.writeAndFlush("pong");log.info("[server] Receive client heart beat message : ----> {}", msg);}} catch (Exception e) {e.printStackTrace();}}/*** 功能描述: 心跳检测** @param ctx 这里的作用主要是解决断网,弱网的情况发生* @param evt* @return void* @author zhouwenjie* @date 2020/4/3 17:02*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {String socketString = ctx.channel().remoteAddress().toString();if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {log.info("Client RSU: " + socketString + " READER_IDLE 读超时");ctx.disconnect();}}}/*** 在处理过程中引发异常时被调用** @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("[server] heart response error: {}", cause.getMessage());ctx.fireExceptionCaught(cause);}
}
NettyServerHandler
/*** @author: zhouwenjie* @description: 服务端业务处理类* @create: 2020-04-03 14:13**/
@Slf4j
@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<ComputeResult> {public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 连接成功*/@Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println("客户端连接");clients.add(ctx.channel());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.error("[*The netty server suspends service...]");super.channelInactive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);log.error("[* Netty connection exception]:{}", cause.toString());cause.printStackTrace();}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ComputeResult computeResult) throws Exception {System.out.println(computeResult);ctx.writeAndFlush(computeResult);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {String socketString = ctx.channel().remoteAddress().toString();if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {System.out.println("Client: " + socketString + " READER_IDLE 读超时");ctx.disconnect();}}}
}
好啦,这个明白了之后,根据自己的业务扩展起来就简单多了吧,只要把握好顺序和泛型,再复杂的编解码也能轻松搞定啦。
编解码器异常继续传递消息(扩展)
这里主要介绍,怎么在一个handler(编码或者解码)失败后,将消息传递到下一个编解码器。
/*** @author: zhouwenjie* @description:* @create: 2022-07-12 11:17**/
public class ProtostuffDecoder<T> extends ByteToMessageDecoder {private Class<T> clazz;public ProtostuffDecoder(Class<T> clazz) {this.clazz = clazz;}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {try {byte[] body = new byte[in.readableBytes()]; //传输正常in.readBytes(body);out.add(ProtostuffUtils.deserialize(body, clazz));} catch (Exception e) {e.printStackTrace();}}
}
如上代码,如果在 out.add(ProtostuffUtils.deserialize(body, clazz));代码出错,怎么才能让消息继续传递给下一个解码器呢?
加下边的代码即可:
/*** @author: zhouwenjie* @description:* @create: 2022-07-12 11:17**/
public class ProtostuffDecoder<T> extends MessageToMessageDecoder<ByteBuf>{private Class<T> clazz;public ProtostuffDecoder(Class<T> clazz) {this.clazz = clazz;}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {try {byte[] body = new byte[in.readableBytes()]; //传输正常in.readBytes(body);out.add(ProtostuffUtils.deserialize(body, clazz));} catch (Exception e) {// 重置读取字节索引,因为上边已经读了(readBytes),不加这个会导致数据为空in.resetReaderIndex();// 这里是复制流,复制一份,防止skipBytes跳过,导致传递的消息变成空;//同时还解决引用计数器为0的异常:refCnt: 0, decrement: 1。ByteBuf buffer = in.retainedDuplicate();//解决 decode() did not read anything but decoded a message的异常//原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。in.skipBytes(in.readableBytes());out.add(buffer);}}
}



















