1.什么是netty
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序,是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
2.Netty的优点
Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了上述问题。
1.设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.
2.使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
3.高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
4.安全:完整的 SSL/TLS 和 StartTLS 支持。
5.社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时更多的新功能会被加入
下面是代码部分:
服务端:
Manger.java
package zmc.com.Server;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;public class Manger {private static final Map<String, Set<String>> friend = new ConcurrentHashMap<>();public static void addFriend(String clientId, String friendId) {Set<String> friendList = friend.get(clientId);if (null==friendList){friendList=new HashSet<>();}friendList.add(friendId);friend.put(clientId, friendList);}public static List<String> list(String clientId) {Set<String> friendList = friend.get(clientId);if (null==friendList){return new ArrayList<>();}return new ArrayList<>(friendList);}
}
ServerHandler.java
package zmc.com.Server;import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import zmc.com.MMessage.MyMessage;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;public class ServerHandler extends SimpleChannelInboundHandler<MyMessage> {//所有的channel存入map集合中,目的是为了私聊好获取用户private static Map<String, Channel> allChannels = new ConcurrentHashMap<String, Channel>();//格式化所有日期时间private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//转化日期private String currentDate = sdf.format(new Date());@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {//获取当前channelChannel channel = ctx.channel();//推送客户加入聊天的信息推送给其它在线的客户端//该方法会将channelGroup中所有的channel遍历并发送消息allChannels.forEach(new BiConsumer<String, Channel>() {@Overridepublic void accept(String k, Channel ch) {ch.writeAndFlush(MyMessage.StringMessage(currentDate + " \n [客户端]" + channel.remoteAddress() + "加入聊天\n"));}});//获取端口号String key = channel.remoteAddress().toString().split(":")[1];allChannels.put(key, channel);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {//获取当前channelChannel channel = ctx.channel();//推送客户加入聊天的信息推送给其它在线的客户端//该方法会将map中所有的channel遍历并发送消息allChannels.forEach(new BiConsumer<String, Channel>() {@Overridepublic void accept(String k, Channel ch) {ch.writeAndFlush(MyMessage.StringMessage(currentDate + " \n [客户端]" + channel.remoteAddress() + "离线\n"));}});System.out.println("当前在线人数:" + allChannels.size());}@Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println(currentDate + " -- " + ctx.channel().remoteAddress() + "上线~");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();String key = channel.remoteAddress().toString().split(":")[1];allChannels.remove(key);System.out.println(currentDate + " -- " + ctx.channel().remoteAddress() + "离线");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//关闭ctx.close();}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, MyMessage myMessage){//获取到当前channelChannel channel = channelHandlerContext.channel();String content = myMessage.getContent().toString();return;}if (content.contains("&")) {String id = content.split("&")[1];String key = channel.remoteAddress().toString().split(":")[1];Manger.addFriend(key, id);Channel userChannel = allChannels.get(id);userChannel.writeAndFlush(myMessage);return;}//循环遍历hashmap集合进行转发消息allChannels.forEach((k, ch) -> {if (channel != ch) {myMessage.setContent(currentDate + " \n [客户端]" + channel.remoteAddress() + ":" + content + "\n");ch.writeAndFlush(myMessage);} else { // 发送消息给自己,回显自己发送的消息myMessage.setContent(currentDate + " \n [我]:" + content + "\n");channel.writeAndFlush(myMessage);}});}
}
Sserver.java
package zmc.com.Server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import zmc.com.MMessage.MyDecoder;
import zmc.com.MMessage.MyEncoder;import java.util.concurrent.TimeUnit;public class Sserver {// 监听端口private int port;public Sserver(int port) {this.port = port;}//编写run方法,处理客户端的请求public void run() {//创建两个线程组EventLoopGroup bossGroup = new NioEventLoopGroup(1);//Nio核数 * 2EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();try {bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//获取pipelineChannelPipeline pipeline = socketChannel.pipeline();//自定义协议pipeline.addLast(new MyEncoder());pipeline.addLast(new MyDecoder());//加入自己的业务处理handlerpipeline.addLast(new ServerHandler());//加入心跳检测机制pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));}});System.out.println("netty 服务器启动");ChannelFuture future = bootstrap.bind(port).sync();//监听关闭事件future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) {Sserver groupChatServer = new Sserver(8001);groupChatServer.run();}
}
客服端:
Cclient.java
package zmc.com.Client;public class Cclient {public static void main(String[] args) {ClientChannel groupChatClient = new ClientChannel("127.0.0.1", 8001);groupChatClient.run();}
}
ClientChannel.java
package zmc.com.Client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import zmc.com.MMessage.MyMessage;
import zmc.com.MMessage.MyDecoder;
import zmc.com.MMessage.MyEncoder;import java.util.Scanner;public class ClientChannel {//定义属性private final String host;public final int port;public ClientChannel(String host, int port) {this.host = host;this.port = port;}public void run() {EventLoopGroup eventExecutors = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();try {bootstrap.group(eventExecutors).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//得到pipelineChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new MyEncoder());pipeline.addLast(new MyDecoder());//加入自定义handlerpipeline.addLast(new ClientHandler());}});ChannelFuture channelFuture = bootstrap.connect(host, port).sync();//得到channelChannel channel = channelFuture.channel();System.out.println("-----" + channel.localAddress() + "----");//客户端需要输入信息,创建一个扫描器Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String msg = scanner.nextLine();//通过channel发送到服务器端channel.writeAndFlush(MyMessage.StringMessage(msg));}channelFuture.channel().closeFuture().sync();} catch (Exception e) {} finally {eventExecutors.shutdownGracefully();}}}
ClientHandler.java
package zmc.com.Client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import zmc.com.MMessage.MyMessage;public class ClientHandler extends SimpleChannelInboundHandler<MyMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, MyMessage message) {System.out.printf(message.getContent().toString());}
}
自定义协议部分:
自定义编码:
MyEncoder.java
package zmc.com.MMessage;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;public class MyEncoder extends MessageToByteEncoder<MyMessage> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, MyMessage myMessage, ByteBuf byteBuf) throws Exception {// 魔数byteBuf.writeShort(www.PACKAGE_MAGIC_NUM);// 指令类型byteBuf.writeByte(myMessage.getType().type());// 消息长度和数据内容final Object content = myMessage.getContent();if (content == null) {byteBuf.writeInt(0);} else {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(content);byte[] bytes = bos.toByteArray();byteBuf.writeInt(bytes.length);byteBuf.writeBytes(bytes);}}
}
自定义解码:
MyDecoder.java
package zmc.com.MMessage;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.List;public class MyDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> messages) throws Exception {final int magic = byteBuf.readShort();if (magic != www.PACKAGE_MAGIC_NUM) {System.out.println(":"+ magic);}final byte opType = byteBuf.readByte();final zzz type = zzz.getOpType(opType);if (type == null) {System.out.println(":"+ opType);return;}MyMessage msg = new MyMessage();msg.setType(type);int length = byteBuf.readInt();msg.setContentLength(length);if(length > 0){byte[] contents = new byte[length];byteBuf.readBytes(contents,0, length);ByteArrayInputStream bis=new ByteArrayInputStream(contents);ObjectInputStream ois=new ObjectInputStream(bis);msg.setContent(ois.readObject());}messages.add(msg);}
}
Message:
MyMessage.java
package zmc.com.MMessage;
import zmc.com.MMessage.zzz;
import zmc.com.MMessage.MyMessage;
import lombok.Data;
@Data
public class MyMessage {private zzz type;private int contentLength;private Object content;public static MyMessage PingMessage() {MyMessage m = new MyMessage();m.setType(zzz.PING);return m;}public static MyMessage PongMessage() {MyMessage m = new MyMessage();m.setType(zzz.PONG);return m;}public static MyMessage StringMessage(String txt) {MyMessage m = new MyMessage();m.setType(zzz.DATA);m.setContent(txt);return m;}public zzz getType() {return type;}public void setType(zzz type) {this.type = type;}public int getContentLength() {return contentLength;}public void setContentLength(int contentLength) {this.contentLength = contentLength;}public Object getContent() {return content;}public void setContent(Object content) {this.content = content;}
}
www.java
package zmc.com.MMessage;public interface www {short PACKAGE_MAGIC_NUM = 2021;int PACKAGE_MAX_SIZE = 50 * 1024 * 1024;}
zzz .java
package zmc.com.MMessage;public enum zzz {PING((byte)0, "666"),PONG((byte)1, "667"),DATA((byte)2, "668");private byte code;private String remark;zzz(byte code, String remark) {this.code = code;this.remark = remark;}public byte type() {return this.code;}public static zzz getOpType(byte type) {for (zzz value : values()) {if (value.type() == type) {return value;}}return null;}
}
运行结果:
服务端:
客户端1:
客户端2: