Netty框架模型
NIO 的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocket、ChannelSocketChannel、 ByteBuffer等。开发工作量和难度都非常大: 例如客户端面临断连重连、 网络闪断、心跳处理、半包读写、 网络拥塞和异常流的处理等等。
Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了上述问题。且Netty拥有高性能、 吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。
Netty 现在都在用的是4.x,5.x版本已经废弃,Netty 4.x 需要JDK 6以上版本支持。
Netty的使用场景:
1)互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用Netty 作为基础通信组件,用于实现。各进程节点之间的内部通信。Rocketmq底层也是用的Netty作为基础通信组件。
2)游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
3)大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
Netty线程模型
模型解释
- Netty抽象2个线程池,Boss Group和Work Group。Boss Group用于处理Accept事件,Work Group 用于处理 Read 和Write事件。
- Boss Group和Work Group都属于EventLoopGroup。
- NioEventLoopGroup 相当于一个事件循环线程组, 这个组中含有多个事件循环线程 , 每一个事件循环线程是NioEventLoop 。
- 每一个NioEventLoop都会有一个Selector用于监听绑定的Chanel
- Boss NioEventLoop的处理流程如下:
a. 进行accpet操作,返回SocketChanel
b.将该Socketchanel注册到某个work NioEventLoop的seletor上
c.处理任务队列的任务 , 即runAllTasks - Work NioEventLoop的处理流程如下:
a. 轮询所有注册到该Loop Seletor上的Socketchanel的Read/Write事件
b.处理Read/Write事件
c.runAllTasks处理任务队列TaskQueue的任务 ,一些耗时的业务处理一般可以放入
Netty 实战
写一个聊天室程序,服务端具备上线检测,群发聊天内容等。Netty极大的简化了NIO编程,开发者更多的是编写ChannelHandle逻辑做业务处理
服务端代码 Server
package com.qinghaihu.chat;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;/**
* @ClassName ChatServer
* @Description TODO
* @Author:Zhang Lianzhong
* @Date 2020/11/22 10:13 下午
* @Version 1.0
**/
public class ChatServer {public void openServer(int port){ServerBootstrap bootstrap = new ServerBootstrap();EventLoopGroup boss = new NioEventLoopGroup(1); //create boss group, threadpool size is 1EventLoopGroup work = new NioEventLoopGroup(5); //create work group, threadpool size is 5bootstrap.group(boss,work); //组合netty组件//配置handle组件bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("encoder",new StringEncoder());ch.pipeline().addLast("decoder",new StringDecoder());ch.pipeline().addLast(new ServerChanelHandle());}});bootstrap.channel(NioServerSocketChannel.class);try{ChannelFuture channel = bootstrap.bind(port).sync();System.out.println(("服务端已启动,绑定端口:" + port));channel.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {boss.shutdownGracefully();work.shutdownGracefully();}}public static void main(String[] args){ChatServer server = new ChatServer();server.openServer(8090);}}
服务端ChannelHandle
package com.qinghaihu.chat;import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;/*** @ClassName ServerChanelHandle* @Description TODO* @Author:Zhang Lianzhong* @Date 2020/11/22 10:21 下午* @Version 1.0**/
public class ServerChanelHandle extends SimpleChannelInboundHandler {//必须定义为类成员变量。每个客户端连接时,都会new ChatServerHandler。static保证数据共享public static ChannelGroup cg = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel ch = ctx.channel();for(Channel chanel: cg){chanel.writeAndFlush(ch.remoteAddress()+"进来啦!");}cg.add(ch);}/*** 上线处理* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();for(Channel ch: cg){ch.writeAndFlush(channel.remoteAddress()+"上线啦");}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {Channel channel = ctx.channel();for(Channel ch: cg){if(channel ==ch){ch.writeAndFlush("我说"+msg);}else {ch.writeAndFlush(channel.remoteAddress()+"说:"+msg);}}}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();cg.remove(channel);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();String customerAddress = channel.remoteAddress().toString();for(Channel ch:cg){ch.writeAndFlush("客户端" + customerAddress + "下线了!");}}}
客户端Client
package com.qinghaihu.chat;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;import java.util.Scanner;/*** @ClassName ChatClient* @Description TODO* @Author:Zhang Lianzhong* @Date 2020/11/22 10:54 下午* @Version 1.0**/
public class ChatClient implements Runnable{private String serverIP;private int port;public ChatClient(String serverIp,int port ){this.serverIP = serverIp;this.port = port;}@Overridepublic void run() {Bootstrap bootstrap = new Bootstrap();EventLoopGroup work = new NioEventLoopGroup(1);bootstrap.group(work);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("encode",new StringEncoder());ch.pipeline().addLast("decode",new StringDecoder());ch.pipeline().addLast(new ClientChanelHandle());}});bootstrap.channel(NioSocketChannel.class);ChannelFuture channelFuture = bootstrap.connect(serverIP,port);Channel channel = channelFuture.channel();Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String sendMsg = scanner.nextLine();channel.writeAndFlush(sendMsg);}work.shutdownGracefully();}public static void main(String[] args){new Thread(new ChatClient("127.0.0.1",8090)).start();}
}
客户端ChannelHandle
package com.qinghaihu.chat;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** @ClassName ClientChanelHandle* @Description TODO* @Author:Zhang Lianzhong* @Date 2020/11/22 11:08 下午* @Version 1.0**/
public class ClientChanelHandle extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);}
}
运行效果