自定义数据包
粘包现象:两个数据包连在一起,导致无法区分。
分包现象:一个数据包中的数据被间隔。
粘包和分包出现的原因是:没有一个稳定数据结构。
数据包的结构:
自定义数据包=包头+模块号+命令号+长度+数据
包头:4个字节
模块号:2个字节short,区分请求类型
命令号:2个字节short,区分具体请求
长度:4个字节,描述数据部分字节长度
数据:更加详细的信息
通过序列化方式将下列数据,转化成字节序列
包头:通常是一个固定的量,使数据更加稳定
自定义请求(响应)=模块号+命令号+请求数据部分
以使用Netty ChannelBuffer序列化为例
请求编码器
public class RequestEncoder extends OneToOneEncoder{
@Override
protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception {Request request = (Request)(rs);ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();//包头buffer.writeInt(ConstantValue.FLAG);//modulebuffer.writeShort(request.getModule());//cmdbuffer.writeShort(request.getCmd());//长度buffer.writeInt(request.getDataLength());//dataif(request.getData() != null){buffer.writeBytes(request.getData());} return buffer;
}
}
请求解码器
frameDecoder协助解决粘包、分包问题
public class RequestDecoder extends FrameDecoder{/*** 数据包基本长度*/public static int BASE_LENTH = 4 + 2 + 2 + 4;
@Override
protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {//可读长度必须大于基本长度if(buffer.readableBytes() >= BASE_LENTH){//防止socket字节流攻击if(buffer.readableBytes() > 2048){buffer.skipBytes(buffer.readableBytes());}//记录包头开始的indexint beginReader;while(true){beginReader = buffer.readerIndex();buffer.markReaderIndex();if(buffer.readInt() == ConstantValue.FLAG){break;}//未读到包头,略过一个字节buffer.resetReaderIndex();buffer.readByte();//长度又变得不满足if(buffer.readableBytes() < BASE_LENTH){return null;}}//模块号short module = buffer.readShort();//命令号short cmd = buffer.readShort();//长度int length = buffer.readInt();//判断请求数据包数据是否到齐if(buffer.readableBytes() < length){//还原读指针buffer.readerIndex(beginReader);return null;}//读取data数据byte[] data = new byte[length];buffer.readBytes(data);Request request = new Request();request.setModule(module);request.setCmd(cmd);request.setData(data);//继续往下传递 return request;}//数据包不完整,需要等待后面的包来return null;
}
}
结合Netty进行数据传递(选择自定义的报文结构)
客户端
pipeline.addLast(“decoder”, new ResponseDecoder()); 用来解析响应,addLast追加
pipeline.addLast(“encoder”, new RequestEncoder()); 用来编码请求
在request中定义数据时,可以继承使用PB的序列化方式,或者自定义的序列化方式 request.setData(fightRequest.getBytes());
public class Client {public static void main(String[] args) throws InterruptedException {//服务类ClientBootstrap bootstrap = new ClientBootstrap();//线程池ExecutorService boss = Executors.newCachedThreadPool();ExecutorService worker = Executors.newCachedThreadPool();//socket工厂bootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));//管道工厂bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() throws Exception {ChannelPipeline pipeline = Channels.pipeline();pipeline.addLast("decoder", new ResponseDecoder());pipeline.addLast("encoder", new RequestEncoder());pipeline.addLast("hiHandler", new HiHandler());return pipeline;}});//连接服务端ChannelFuture connect = bootstrap.connect(new InetSocketAddress("127.0.0.1", 10101));Channel channel = connect.sync().getChannel();System.out.println("client start");Scanner scanner = new Scanner(System.in);while(true){System.out.println("请输入");int fubenId = Integer.parseInt(scanner.nextLine());int count = Integer.parseInt(scanner.nextLine());FightRequest fightRequest = new FightRequest();fightRequest.setFubenId(fubenId);fightRequest.setCount(count);Request request = new Request();request.setModule((short) 1);request.setCmd((short) 1);request.setData(fightRequest.getBytes());//发送请求channel.write(request);}
}
}
服务器端
pipeline.addLast(“decoder”, new RequestDecoder()); 用来解析请求
pipeline.addLast(“encoder”, new ResponseEncoder()); 用来编码响应
public class Server {public static void main(String[] args) {//服务类ServerBootstrap bootstrap = new ServerBootstrap();//boss线程监听端口,worker线程负责数据读写ExecutorService boss = Executors.newCachedThreadPool();ExecutorService worker = Executors.newCachedThreadPool();//设置niosocket工厂bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));//设置管道的工厂bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() throws Exception {ChannelPipeline pipeline = Channels.pipeline();pipeline.addLast("decoder", new RequestDecoder());pipeline.addLast("encoder", new ResponseEncoder());pipeline.addLast("helloHandler", new HelloHandler());return pipeline;}});bootstrap.bind(new InetSocketAddress(10101));System.out.println("start!!!");}}
知识补充:
TCP数据包结构=偏移量+窗口字段+端口号,总共占32个字节
上面的自定义的最基础的占用12个字节