文章目录
- 一、 三大组件
- 1.1 Channel & Buffer
- 1.2 Selector
- 二、 ByteBuffer字节缓存
- 2.1 结构
- 2.2 堆内存与直接内存
- 2.3 读与写
- 2.4 Scattering Reads与Gathering Writes
- 2.5 简单处理黏包与半包
- 三、FileChannel文件编程
- 3.1 读取
- 3.2 写入
- 3.3 关闭
- 3.4 位置
- 3.5 大小
- 3.6 强制写入
- 3.7 两个Channel传输数据
- 3.8 Path
- 3.8.1 操作
- 3.8.2 遍历目录文件
- 3.8.3 遍历文件下的Jar包
- 3.8.4 批量删除
- 3.8.5 批量复制
- 四、ServerSocketChannel网络编程
- 4.1 模拟阻塞模式
- 4.2 非阻塞模式
- 4.3 nio-selector处理accept
- 4.4 nio-selector处理read
- 4.4.1 用完key为什么要移除
- 4.4.2 处理异常断开与正常断开
- 4.4.3 消息边界问题与附件
- 4.4.3.1 ByteBuffer 大小的分配
- 4.4.4 写入内容与写入内容过多
- 4.5 方法
- 参考文献
non-blocking io 非阻塞IO
一、 三大组件
1.1 Channel & Buffer
Channel:是指数据传输的双向通道。
Buffer:数据暂存区域,暂存Channel中的数据。是应用程序、文件、网络之间的桥梁。
Channel有一点类似于Stream,是读写数据的双向通道。可以从Channel将数据读入Buffer,也可以Buffer数据写入Channel。Stream要么写入要么输出,Channel比Stream更底层。
常见的Channel有:
- FileChannel:文件传输通道
- DatagramChannel:UDP网络编程时的通道
- SocketChannel:TCP数据传输通道,客户端与服务器都可以用
- ServerSocketChannel:专用于服务器TCP传输通道
常见Buffer有:
- ByteBuffer:
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
- CharBuffer
Buffer是一个抽象类,所有上面所有的Buffer都是它的子类
public abstract class Buffer{//mark <= position <= limit <= capacityprivate int mark = -1;private int position = 0;private int limit;private int capacity;//直接缓冲区实现子类的数据内存地址long address;
}
1.2 Selector
选择器。我们由如下场景来理解。
对于服务器,一个客户端连接,一个socket。我们该如何管理socket?
- 多线程版本
每一个socket由线程来管理。
一旦连接多起来,线程也会很多,因为线程本身占用资源较多,将导致服务器被线程暂用过多,很快就会内存溢出。
因此就会有如下问题:
- 内存占用高
- 线程上下文切换成本高
- 只适合连接数少的场景
- 线程池版本
有线程池限制线程数目,避免上述问题
缺点
- 阻塞模式下,线程仅能处理一个socket连接。如果socket什么都没做,该线程只能等待,不能处理其他请求。只有socket断开后,才可以处理新的socket。
- 仅适合短连接场景
- selector版
selector是一个用于检测所有需求的工具。
多路复用。配合线程管理多个channel,获取channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程一直在一个线程上等待、适合连接数多,流量低的场景(low traffic)
调用selector的select()会阻塞直到channel发生了读写就绪事件,这些事件发生,select方法就会返回这些时间交给thread来处理。
二、 ByteBuffer字节缓存
创建一个项目,并创建如下测试文件
接着在单元测试里面创建一个测试文件
package com.yjx23332.netty.test;import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;public class TestByteBuffer {public static void main(String[] args){//FileChannel//输入输出流读取 或者 RandomAccessFile 读取//相对路径,从更目录开始try(FileChannel fileChannel = new FileInputStream("data.txt").getChannel()){//准备缓冲区ByteBuffer byteBuffer = ByteBuffer.allocate(30);//单位:字节//read:从Channel中读出数据,写入byteBufferfileChannel.read(byteBuffer);//打印BufferbyteBuffer.flip();//切换至读模式while(byteBuffer.hasRemaining()){//是否还有剩余维度数据byte b = byteBuffer.get();//1次1个字节System.out.print((char)b);}}catch(IOException ioException){}}
}
如果内容大于缓冲区,读取不完,则用
package com.yjx23332.netty.test;import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;public class TestByteBuffer {public static void main(String[] args){//FileChannel//输入输出流读取 或者 RandomAccessFile 读取try(FileChannel fileChannel = new FileInputStream("data.txt").getChannel()){//准备缓冲区ByteBuffer byteBuffer = ByteBuffer.allocate(10);//单位:while(true){//从Channel中读出数据,向buffer中写入,返回字节数,-1:为空int len = fileChannel.read(byteBuffer);if(len == -1){//内容为空break;}//打印BufferbyteBuffer.flip();//切换至读模式while(byteBuffer.hasRemaining()){//是否还有剩余维度数据byte b = byteBuffer.get();//1次1个字节System.out.print((char)b);}//切换为写模式//byteBuffer.compact(); 也可以进行切换为写模式byteBuffer.clear();}}catch(IOException ioException){}}
}
2.1 结构
- capacity:容量
- position:读写指针
- limit:读写限制
写模式,写入a,b,c,d,e
flip动作,变为读模式
读取3字节
clear操作,切换为写模式并清空
compact操作
我们导入如下依赖,和编写如下工具类
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId><version>2.6.5</version></parent><groupId>org.example</groupId><artifactId>untitled</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target></properties><dependencies><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency></dependencies>
</project>
package com.yjx23332.netty.test;import io.netty.util.internal.MathUtil;
import io.netty.util.internal.StringUtil;import java.nio.ByteBuffer;import static io.netty.util.internal.MathUtil.isOutOfBounds;
import static io.netty.util.internal.StringUtil.NEWLINE;public class ByteBufferUtil {private static final char[] BYTE2CHAR = new char[256];private static final char[] HEXDUMP_TABLE = new char[256 * 4];private static final String[] HEXPADDING = new String[16];private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];private static final String[] BYTE2HEX = new String[256];private static final String[] BYTEPADDING = new String[16];static {final char[] DIGITS = "0123456789abcdef".toCharArray();for (int i = 0; i < 256; i++) {HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];}int i;// Generate the lookup table for hex dump paddingsfor (i = 0; i < HEXPADDING.length; i++) {int padding = HEXPADDING.length - i;StringBuilder buf = new StringBuilder(padding * 3);for (int j = 0; j < padding; j++) {buf.append(" ");}HEXPADDING[i] = buf.toString();}// Generate the lookup table for the start-offset header in each row (up to 64KiB).for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {StringBuilder buf = new StringBuilder(12);buf.append(StringUtil.NEWLINE);buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));buf.setCharAt(buf.length() - 9, '|');buf.append('|');HEXDUMP_ROWPREFIXES[i] = buf.toString();}// Generate the lookup table for byte-to-hex-dump conversionfor (i = 0; i < BYTE2HEX.length; i++) {BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);}// Generate the lookup table for byte dump paddingsfor (i = 0; i < BYTEPADDING.length; i++) {int padding = BYTEPADDING.length - i;StringBuilder buf = new StringBuilder(padding);for (int j = 0; j < padding; j++) {buf.append(' ');}BYTEPADDING[i] = buf.toString();}// Generate the lookup table for byte-to-char conversionfor (i = 0; i < BYTE2CHAR.length; i++) {if (i <= 0x1f || i >= 0x7f) {BYTE2CHAR[i] = '.';} else {BYTE2CHAR[i] = (char) i;}}}/*** 打印所有内容* @param buffer*/public static void debugAll(ByteBuffer buffer) {int oldlimit = buffer.limit();buffer.limit(buffer.capacity());StringBuilder origin = new StringBuilder(256);appendPrettyHexDump(origin, buffer, 0, buffer.capacity());System.out.println("+--------+-------------------- all ------------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);System.out.println(origin);buffer.limit(oldlimit);}/*** 打印可读取内容* @param buffer*/public static void debugRead(ByteBuffer buffer) {StringBuilder builder = new StringBuilder(256);appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());System.out.println("+--------+-------------------- read -----------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());System.out.println(builder);}private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {throw new IndexOutOfBoundsException("expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length+ ") <= " + "buf.capacity(" + buf.capacity() + ')');}if (length == 0) {return;}dump.append(" +-------------------------------------------------+" +StringUtil.NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");final int startIndex = offset;final int fullRows = length >>> 4;final int remainder = length & 0xF;// Dump the rows which have 16 bytes.for (int row = 0; row < fullRows; row++) {int rowStartIndex = (row << 4) + startIndex;// Per-row prefix.appendHexDumpRowPrefix(dump, row, rowStartIndex);// Hex dumpint rowEndIndex = rowStartIndex + 16;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(" |");// ASCII dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append('|');}// Dump the last row which has less than 16 bytes.if (remainder != 0) {int rowStartIndex = (fullRows << 4) + startIndex;appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);// Hex dumpint rowEndIndex = rowStartIndex + remainder;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(HEXPADDING[remainder]);dump.append(" |");// Ascii dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append(BYTEPADDING[remainder]);dump.append('|');}dump.append(StringUtil.NEWLINE +"+--------+-------------------------------------------------+----------------+");}private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {if (row < HEXDUMP_ROWPREFIXES.length) {dump.append(HEXDUMP_ROWPREFIXES[row]);} else {dump.append(StringUtil.NEWLINE);dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));dump.setCharAt(dump.length() - 9, '|');dump.append('|');}}public static short getUnsignedByte(ByteBuffer buffer, int index) {return (short) (buffer.get(index) & 0xFF);}
}
接下来试试使用它
package com.yjx23332.netty.test;import java.nio.ByteBuffer;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;public class TestByteBuffer {public static void main(String[] args){ByteBuffer byteBuffer = ByteBuffer.allocate(10);byteBuffer.put((byte) 0x61);debugAll(byteBuffer);}}
我们试验一下我们上面的理论
package com.yjx23332.netty.test;import java.nio.ByteBuffer;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;public class TestByteBuffer {public static void main(String[] args){ByteBuffer byteBuffer = ByteBuffer.allocate(10);byteBuffer.put(new byte[]{ 0x61,0x63,0x64,'a','b','c'});debugAll(byteBuffer);//获取position的结果,即index=6的值,也就是空//get()会让position移动System.out.println(byteBuffer.get());//获取index=1的值//该方式不会移动positionSystem.out.println(byteBuffer.get(0));byteBuffer.flip();System.out.println(byteBuffer.get());System.out.println(byteBuffer.get());byteBuffer.compact();debugAll(byteBuffer);}}
2.2 堆内存与直接内存
package com.yjx23332.netty.test;import java.nio.ByteBuffer;public class TestByteBuffer {public static void main(String[] args){/*** HeapByteBuffer* 使用的是Java堆内存,效率较低* 会受到GC回收的影响* */System.out.println(ByteBuffer.allocate(16).getClass());/*** DirectByteBuffer* 使用的是直接内存,效率较高,因为少一次数据的拷贝* 不会收到GC回收影响* 分配内存效率较低,因为不受GC管理,因此要自己管理,避免内存占用* */System.out.println(ByteBuffer.allocateDirect(16).getClass());}}
2.3 读与写
package com.yjx23332.netty.test;import java.nio.ByteBuffer;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;public class TestByteBuffer {public static void main(String[] args){ByteBuffer byteBuffer = ByteBuffer.allocate(10);byteBuffer.put(new byte[]{ 'a','b','c','d','e','f','g'});byteBuffer.flip();/*** 读4个字节* get(new byte[])会让position移动*/System.out.println(byteBuffer.get(new byte[4]));debugAll(byteBuffer);//该方式不会移动positionSystem.out.println((char) byteBuffer.get(4));debugAll(byteBuffer);/*** 将 position重新设为0* */byteBuffer.rewind();//get()会让position移动System.out.println((char)byteBuffer.get());/*** mark & reset* 增强rewind* mark 做一个标记,记录position的位置* reset 将position重置到mark的位置* */System.out.println((char) byteBuffer.get());System.out.println((char) byteBuffer.get());byteBuffer.mark();System.out.println((char) byteBuffer.get());System.out.println((char) byteBuffer.get());byteBuffer.reset();System.out.println((char) byteBuffer.get());System.out.println((char) byteBuffer.get());}
}
package com.yjx23332.netty.test;import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;public class TestByteBuffer {public static void main(String[] args){// 1. 字符串转为 ByteBufferByteBuffer byteBuffer = ByteBuffer.allocate(16);//默认使用操作系统编码byteBuffer.put("hello".getBytes());//指定编码集//byteBuffer.put("hello".getBytes(StandardCharsets.UTF_8));//使用debugAll(byteBuffer);//2. Charset//转换后,自动切换到读模式//使用操作系统默认字符集 Charset.defaultCharset()//指定编码集ByteBuffer byteBuffer1 = StandardCharsets.UTF_8.encode("hello");debugAll(byteBuffer1);//3.wrap,自动切换到读模式ByteBuffer byteBuffer2 = ByteBuffer.wrap("hello".getBytes());debugAll(byteBuffer2);//byteBuffer是写模式,此时会有问题System.out.println(StandardCharsets.UTF_8.decode(byteBuffer).toString());System.out.println(StandardCharsets.UTF_8.decode(byteBuffer1).toString());System.out.println(StandardCharsets.UTF_8.decode(byteBuffer2).toString());}}
2.4 Scattering Reads与Gathering Writes
Scattering Reads分散读取:把一个文件分别读取到多个ByteBuffer之中(读取的个数已知)。
准备如下文件
package com.yjx23332.netty.test;import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;public class TestScatteringReads {public static void main(String[] args){try(FileChannel channel = new RandomAccessFile("words.txt","r").getChannel()){ByteBuffer b1 = ByteBuffer.allocate(3);ByteBuffer b2 = ByteBuffer.allocate(3);ByteBuffer b3 = ByteBuffer.allocate(5);channel.read(new ByteBuffer[]{b1,b2,b3});b1.flip();b2.flip();b3.flip();debugAll(b1);debugAll(b2);debugAll(b3);}catch (IOException ioException){}}
}
Gathering Writes集中写入
package com.yjx23332.netty.test;import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;public class TestGatheringWrites {public static void main(String[] args){ByteBuffer byteBuffer1 = StandardCharsets.UTF_8.encode("hello");ByteBuffer byteBuffer2 = StandardCharsets.UTF_8.encode("world");ByteBuffer byteBuffer3 = StandardCharsets.UTF_8.encode("你好世界");try(FileChannel fileChannel = new RandomAccessFile("words2.txt","rw").getChannel()){fileChannel.write(new ByteBuffer[]{byteBuffer1,byteBuffer2,byteBuffer3});}catch (IOException ioException){}}}
2.5 简单处理黏包与半包
在给服务器发送信息时
黏包:两个消息合在一起。发生原因:提高发送效率,一次性发多个消息。
半包:一个消息被截成两段。发送原因:数据过多,一个包装不下只能分开。
package com.yjx23332.netty.test;import java.nio.ByteBuffer;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;public class TestByteBufferExam {public static void main(String[] args){/*** 模拟分两次接收到消息* */ByteBuffer source = ByteBuffer.allocate(32);/*** 黏包* */source.put("hello,world\nI'm zhangsan\nHo".getBytes());split(source);/*** 半包* */source.put("w are you?\n".getBytes());split(source);}private static void split(ByteBuffer source){source.flip();for(int i = 0;i < source.limit();i++){// 找到一条完整消息if(source.get(i) == '\n'){int length = i + 1 - source.position();// 把这条完整消息存入新的 ByteBufferByteBuffer target = ByteBuffer.allocate(length);//从source读,向target写for(int j = 0; j < length;j++)target.put(source.get());debugAll(target);}}//没有找到说明半包source.compact();}
}
三、FileChannel文件编程
FileChannel只能工作在阻塞模式下,这里和IO是一致的。
我们不能直接获取FileChannel,必须通过FileInputStream、FileOutputStream或者RandomAccessFile来获取FileChannel,它们都有getChannel方法。
- 通过FileInputStream获取的channel只能读
- 通过FileOutputStream获取的channel只能写
- 通过RandomAccessFile是否能读写根据构造RandomAccessFile时的读写模式决定.
3.1 读取
从channel读取数据填充ByteBuffer,返回值读到了多少字节,-1表示到达文件末尾
流程可以参考1.3开头
int readByte = channel.read(buffer);
3.2 写入
ByteBuffer buffer = ...;
buffer.put(...);//存入数据
buffer.flip();//切换读写模式
while(buffer.hasRemaining()){channel.write(buffer);
}
3.3 关闭
channel必须关闭,但调用了FileInputStream、FileOutputStream或者RandomAccessFile的close方法,也会间接的调用close方法。
3.4 位置
获取当前位置
long pos = channel.position();
设置当前位置
long newPos = ...;
channel.position(newPos);
如果设置为文件末尾,这时读取会返回-1,这是写入会追加内容
但是如果超过了末尾,在写入时,新内容和与原末尾之间会有空洞(00)
3.5 大小
size方法可获取文件大小
3.6 强制写入
操作系统出于性能的考虑,会将数据缓存,不是立即写入磁盘,可以用force(true)方法,将文件内容和元数据(文件的权限等信息)立即写入磁盘。
3.7 两个Channel传输数据
x.transferTo():x传给…数据
x.transferFrom():从…传给x数据
package com.yjx23332.netty.test;import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;public class TestFileChannelTransferTo {public static void main(String[] args){try(FileChannel from = new FileInputStream("data.txt").getChannel();FileChannel to = new FileOutputStream("to.txt").getChannel();){/*** 效率高,底层利用操作系统的零拷贝进行优化* 一次最多传输2G* */from.transferTo(0,from.size(),to);}catch (IOException ioException){ioException.printStackTrace();}}
}
由于一次最多传输2G,于是我们需要进行改进
package com.yjx23332.netty.test;import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;public class TestFileChannelTransferTo {public static void main(String[] args){try(FileChannel from = new FileInputStream("data.txt").getChannel();FileChannel to = new FileOutputStream("to.txt").getChannel();){/*** 效率高,底层利用操作系统的零拷贝进行优化* 一次最多传输2G* */long size = from.size();for(long left = size; left > 0;){/*** 返回实际传输字符* @param 位置,个数,目标*/System.out.println("position:" + (size - left) + ",left:" + left);left = left - from.transferTo(size - left,left,to);}}catch (IOException ioException){ioException.printStackTrace();}}
}
3.8 Path
jdk7引入了Path和Paths类
- Path用来表示文件路径
- Paths是工具类,用来获取Path实例
3.8.1 操作
Path source = Paths.get("1.txt"); //相对路径 使用user.dir环境变量来定位 1.txt
Path source = Paths.get("d:\\1.txt"); //绝对路径 代表了 d:\1.txt
Path source = Paths.get("d:/1.txt"); //绝对路径 代表了 d:\1.txt
Path source = Paths.get("d:\\data","projects"); //代表了 d:\data\projects
- . :当前路径
- …: 上级路径
检查文件是否存在
Files.exists(path)
创建一级目录
已经存在则报异常,且只能创建一级目录
Files.createDirectory(path);
创建多级目录
Files.createDirectories(path);
拷贝文件
如果文件存在,则会报异常FileAlreadyExistsException
用的操作系统的实现
Files.copy(sourcePath,targetPath);
如果希望用source 覆盖掉target,则要用StandardCopyOption.REPLACE_EXISTING
StandardCopyOption可以放多个,用‘,’隔开即可
Files.copy(source,target, StandardCopyOption.REPLACE_EXISTING);
移动文件
StandardCopyOption.ATOMIC_MOVE保证移动的原子性
Files.move(source,target, StandardCopyOption.ATOMIC_MOVE);
删除目录、文件
如果文件不存在,则会抛异常NoSuchFileException
如果目录中有内容,则会抛异常 DirectoryNotEmptyException
Files.delete(target);
3.8.2 遍历目录文件
package com.yjx23332.netty.test;import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.atomic.AtomicInteger;public class TestFilesWalkFileTree {public static void main(String[] args) throws IOException {AtomicInteger dirCount = new AtomicInteger();AtomicInteger fileCount = new AtomicInteger();// 访问者模式Files.walkFileTree(Paths.get("路径"),new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {System.out.println("====>"+dir);dirCount.incrementAndGet();return super.preVisitDirectory(dir, attrs);}@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {System.out.println("【"+file+"】");fileCount.incrementAndGet();return super.visitFile(file, attrs);}});System.out.println("dir count:" + dirCount);System.out.println("file count:" + fileCount);}
}
最后发现多了一个文件夹,是因为windows找到的文件夹不包含最外层的文件夹
3.8.3 遍历文件下的Jar包
package com.yjx23332.netty.test;import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.atomic.AtomicInteger;public class TestFilesWalkFileTree {public static void main(String[] args) throws IOException {AtomicInteger jarCount = new AtomicInteger();Files.walkFileTree(Paths.get("C:\\Program Files\\Java"),new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {if(file.toString().endsWith(".jar")){System.out.println(file);jarCount.incrementAndGet();}return super.visitFile(file, attrs);}});System.out.println("jar count:" + jarCount);}
}
3.8.4 批量删除
注意该方式删除会直接删除,不会进入回收站,谨慎使用。
package com.yjx23332.netty.test;import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;public class TestFilesWalkFileTree {public static void main(String[] args) throws IOException {Files.walkFileTree(Paths.get("路径"),new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {System.out.println("=====> 进入 dir:" + dir);return super.preVisitDirectory(dir, attrs);}@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {Files.delete(file);System.out.println("=====> 删除 file:" + file);return super.visitFile(file, attrs);}@Overridepublic FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {Files.delete(dir);System.out.println("=====> 删除 dir:" + dir);return super.postVisitDirectory(dir, exc);}});}
}
3.8.5 批量复制
package com.yjx23332.netty.test;import java.io.IOException;
import java.nio.file.*;public class TestFilesWalkFileTree {public static void main(String[] args) throws IOException {String source = "D:\\test1";String target = "D:\\test2";Files.walk(Paths.get(source)).forEach(path -> {try {String targetName = path.toString().replace(source, target);//如果是目录if (Files.isDirectory(path)) {Files.createDirectory(Paths.get(targetName));} else {//如果是文件Files.copy(path, Paths.get(targetName));}}catch (IOException ioException){ioException.printStackTrace();}});}
}
四、ServerSocketChannel网络编程
为方便打印,我们引入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
4.1 模拟阻塞模式
我们故意在以下代码中使用多次阻塞模式
同时以debug模式,运行下列两个代码文件
客户端在此处打上断点
package com.yjx23332.netty.test;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;@Slf4j
public class Server {public static void main(String[] args) throws IOException {//阻塞模式//0. 声明字节缓存ByteBuffer buffer = ByteBuffer.allocate(16);//1.创建服务器ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//2.绑定监听端口serverSocketChannel.bind(new InetSocketAddress(8080));//3.连接集合List<SocketChannel> channels = new ArrayList<>();while(true){//4. accept建立与客户端连接,SocketChannel用来与客户端通信log.debug("connecting...");/*** 在没有连接建立时,线程停止运行,阻塞在这里* 每一次建立连接之后,才会继续执行* */SocketChannel socketChannel = serverSocketChannel.accept();log.debug("connected...{}",socketChannel);channels.add(socketChannel);for(SocketChannel channel:channels){//5. 接收客户端发送的数据log.debug("before read..{}",channel);/*** 也是阻塞方法,线程也会在这里等待读入数据* 客户端没有发送数据,则会在这里停止*/channel.read(buffer);//6. 调试bufferbuffer.flip();debugAll(buffer);buffer.clear();log.debug("after read...{}",channel);}}}
}
package com.yjx23332.netty.test;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;public class Client {public static void main(String[] args) throws IOException {SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress("localhost",8080));System.out.println("waiting");}
}
在客户端中,对其通道我们手动填写数据,看服务器反应。
发送如下数据
服务器部分
如果我们再次发送,将无法继续执行。因为在accept处阻塞了。正常情况,我们用一个线程来管理一个accept,这就和我们之前的线程和线程池的架构。
4.2 非阻塞模式
修改服务端
package com.yjx23332.netty.test;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;@Slf4j
public class Server {public static void main(String[] args) throws IOException {//非阻塞模式//0. 声明字节缓存ByteBuffer buffer = ByteBuffer.allocate(16);//1.创建服务器ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//2. 切换为非阻塞模式serverSocketChannel.configureBlocking(false);//3.绑定监听端口serverSocketChannel.bind(new InetSocketAddress(8080));//4.连接集合List<SocketChannel> channels = new ArrayList<>();while(true){//5. accept建立与客户端连接,SocketChannel用来与客户端通信//log.debug("connecting...");/*** 在没有连接建立时,线程将不会停下来* 此时返回的是null* */SocketChannel socketChannel = serverSocketChannel.accept();if(socketChannel != null){log.debug("connected...{}",socketChannel);//6. 设置为非阻塞模式socketChannel.configureBlocking(false);channels.add(socketChannel);}for(SocketChannel channel:channels){//7. 接收客户端发送的数据//log.debug("before read..{}",channel);/*** 客户端没有发送数据,也不会在这里停止* 返回内容为0*/int read = channel.read(buffer);if(read == 0)continue;//8. 调试bufferbuffer.flip();debugAll(buffer);buffer.clear();log.debug("after read...{}",channel);}}}
}
4.3 nio-selector处理accept
在没有连接和输入时,CPU就一直在那里空转,浪费资源。通过Selector让CPU在没有事件要处理时,就休息。
首先我们先了解一下,事件类型
事件 | 解释 |
---|---|
accept | 有连接请求时,触发 |
connect | 客户端侧连接建立后,触发 |
read | 客户端有数据发出,服务器有数据可读时,触发 |
write | 可写事件,服务器写入时,触发 |
package com.yjx23332.netty.test;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;@Slf4j
public class Server {public static void main(String[] args) throws IOException {//1. 创建selector,管理多个ChannelSelector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);//2. 建立 selector 和 channel 的联系 (注册)/*** SelectionKey 就是事件发生后,通过他可以知道事件发生了以及是哪个channel事件关注了* 此处就是serverSocketChannel关注了SelectionKey.OP_ACCEPT事件,注册在selector* @param selector,关注的事件(0就是都不关注),附件* */SelectionKey sscKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,null);log.debug("register key:{}",sscKey);serverSocketChannel.bind(new InetSocketAddress(8080));while(true){//3. select 方法,事件未发生阻塞在此处。一旦事件发生,线程恢复,继续运行selector.select();//4. 处理事件,获取事件keysIterator<SelectionKey> iter = selector.selectedKeys().iterator();while(iter.hasNext()){SelectionKey key = iter.next();log.debug("key:{}",key);ServerSocketChannel SSChannel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = SSChannel.accept();log.debug("连接建立:{}",socketChannel);}log.debug("connected");}}
}
可以看到,有多个连接,但他们的key是一致的。
- 有未处理完的事件将不会阻塞,如果我们在
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while(iter.hasNext()){SelectionKey key = iter.next();log.debug("key:{}",key);
// ServerSocketChannel SSChannel = (ServerSocketChannel) key.channel();
// SocketChannel socketChannel = SSChannel.accept();
// log.debug("连接建立:{}",socketChannel);}
后什么都不做,他会认为我们还没有处理完,就会继续处理,就不会阻塞
- 反之,处理之后,就会阻塞
我们也可以取消事件
key.cancel();
也就是说,事件要么处理要么取消,不能置之不理。
4.4 nio-selector处理read
package com.yjx23332.netty.test;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;@Slf4j
public class Server {public static void main(String[] args) throws IOException {//1. 创建selector,管理多个ChannelSelector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);//2. 建立 selector 和 channel 的联系 (注册)SelectionKey sscKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,null);log.debug("register key:{}",sscKey);serverSocketChannel.bind(new InetSocketAddress(8080));while(true){//3. select 方法,事件未发生阻塞在此处。一旦事件发生,线程恢复,继续运行selector.select();//4. 处理事件,获取事件keysIterator<SelectionKey> iter = selector.selectedKeys().iterator();while(iter.hasNext()){SelectionKey key = iter.next();log.debug("key:{}",key);//5. 区分事件if(key.isAcceptable()){ServerSocketChannel SSChannel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = SSChannel.accept();socketChannel.configureBlocking(false);log.debug("连接建立:{}",socketChannel);SelectionKey scKey = socketChannel.register(selector,SelectionKey.OP_READ,null);log.debug("register read key:{}",scKey);}else if(key.isReadable()){SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(16);socketChannel.read(buffer);buffer.flip();debugAll(buffer);}}}}
}
运行后传入值使用read之后会报错,这是因为我们没有移除key。可以看到,我们这里报错报的是我们accept事件发生了,但是此时我们没有连接请求,因此可以加入的为空。
4.4.1 用完key为什么要移除
那么为什么会触发accept事件?
注意,下图中绘制有遗漏,accept是属于sscKey,read是属于scKey。
package com.yjx23332.netty.test;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;@Slf4j
public class Server {public static void main(String[] args) throws IOException {//1. 创建selector,管理多个ChannelSelector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);//2. 建立 selector 和 channel 的联系 (注册)SelectionKey sscKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,null);log.debug("register key:{}",sscKey);serverSocketChannel.bind(new InetSocketAddress(8080));while(true){//3. select 方法,事件未发生阻塞在此处。一旦事件发生,线程恢复,继续运行selector.select();//4. 处理事件,获取事件keysIterator<SelectionKey> iter = selector.selectedKeys().iterator();while(iter.hasNext()){SelectionKey key = iter.next();//5. 处理 key 时,从集合中移除iter.remove();log.debug("key:{}",key);//6. 区分事件if(key.isAcceptable()){ServerSocketChannel SSChannel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = SSChannel.accept();socketChannel.configureBlocking(false);log.debug("连接建立:{}",socketChannel);SelectionKey scKey = socketChannel.register(selector,SelectionKey.OP_READ,null);log.debug("register read key:{}",scKey);}else if(key.isReadable()){SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(16);socketChannel.read(buffer);buffer.flip();debugAll(buffer);}}}}
}
成功处理
4.4.2 处理异常断开与正常断开
我们发现,如果强制关闭Client,服务器会报错关闭。这显然是有问题的,因为我们不能因为一个客户端掉线就断了整个服务。
那么为什么会报错?
- 当客户端连接异常断开时,为了让服务器知道断开了连接,会产生OP_READ事件。但此时消息无法被读取,因此报读写错误。
- 如果只是简单的try,catch,就会认为没有处理,接着又会进入到下一次事件触发中,然后read又一次报错,一直循环下去。
- 因此我们还要cancel掉该事件
对于正常断开,也会触发OP_READ事件,但是它的消息是正规的,不过结果是长度-1。我们通过判断是否为-1来决定是否是断开连接。
package com.yjx23332.netty.test;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;@Slf4j
public class Server {public static void main(String[] args) throws IOException {//1. 创建selector,管理多个ChannelSelector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);//2. 建立 selector 和 channel 的联系 (注册)SelectionKey sscKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,null);log.debug("register key:{}",sscKey);serverSocketChannel.bind(new InetSocketAddress(8080));while(true){//3. select 方法,事件未发生阻塞在此处。一旦事件发生,线程恢复,继续运行selector.select();//4. 处理事件,获取事件keysIterator<SelectionKey> iter = selector.selectedKeys().iterator();while(iter.hasNext()){SelectionKey key = iter.next();//5. 处理 key 时,从集合中移除iter.remove();log.debug("key:{}",key);//6. 区分事件if(key.isAcceptable()){ServerSocketChannel SSChannel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = SSChannel.accept();socketChannel.configureBlocking(false);log.debug("连接建立:{}",socketChannel);SelectionKey scKey = socketChannel.register(selector,SelectionKey.OP_READ,null);log.debug("register read key:{}",scKey);}else if(key.isReadable()){try {SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(16);if(socketChannel.read(buffer) == -1){key.cancel();continue;}buffer.flip();debugAll(buffer);}catch (IOException ioException){ioException.printStackTrace();key.cancel(); //注销该事件,从selector集合中删除}}}}}
}
package com.yjx23332.netty.test;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;public class Client {public static void main(String[] args) throws IOException {SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress("localhost",8080));System.out.println("waiting");socketChannel.close();}
}
4.4.3 消息边界问题与附件
消息边界,因为消息的长度不缺定,预先创建的缓存过小,导致一个消息被分为两次传输。或者缓存过大,两个消息合在一起了。(半包和黏包)
解决思路
- 固定消息长度,数据包大小一样,服务器按预定长度读取,缺点就是浪费带宽
- 按找分隔符号分割字符。我们通过分隔符好来确定是否获取完整。同时需要一个临时ByteBuffer,但如果消息比ByteBuffer长,同样要考虑扩容。需要一个字节一个字节地找,因此效率也不是很高
- 报消息分为两部分,前半部分存储了后续内容的长度,随后在发送内容。服务器先读一个整型(比如)那么服务器就分配该整型的大小,随后再接收内容。缺点是如果内容过大,则影响server吞吐量。
第三种方式,很类似TLV和LTV格式:type类型、Length长度、Value数据。在type类型,Length长度已知情况下,较方便的分配和获取消息。
- Http 1.1 是TLV格式
- Http 2.0 是LTV格式
当读不完的时候,会自动读取多次。
我们使用方法2,方法3则在Netty部分。
package com.yjx23332.netty.test;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;public class Client {public static void main(String[] args) throws IOException {SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress("localhost",8080));socketChannel.write(Charset.defaultCharset().encode("hello\nworld!\nThis\nis\na\nnew\nday\n"));socketChannel.close();}
}
package com.yjx23332.netty.test;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;@Slf4j
public class Server {public static void main(String[] args) throws IOException {//1. 创建selector,管理多个ChannelSelector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);//2. 建立 selector 和 channel 的联系 (注册)SelectionKey sscKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, null);log.debug("register key:{}", sscKey);serverSocketChannel.bind(new InetSocketAddress(8080));while (true) {//3. select 方法,事件未发生阻塞在此处。一旦事件发生,线程恢复,继续运行selector.select();//4. 处理事件,获取事件keysIterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();//5. 处理 key 时,从集合中移除iter.remove();log.debug("key:{}", key);//6. 区分事件if (key.isAcceptable()) {ServerSocketChannel SSChannel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = SSChannel.accept();socketChannel.configureBlocking(false);log.debug("连接建立:{}", socketChannel);//7. 添加附件/*** buffer与socketChannel关联,buffer的声明周期,将和绑定的SelectionKey相同* */SelectionKey scKey = socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(4));log.debug("register read key:{}", scKey);} else if (key.isReadable()) {try {SocketChannel socketChannel = (SocketChannel) key.channel();//8. 获取附件/*** 如果容量不够,就创建一个新的去重新关联即可* key.attach(buffer)* */ByteBuffer buffer = (ByteBuffer) key.attachment();if (socketChannel.read(buffer) == -1) {key.cancel();continue;}split(buffer);//9. 如果切割后,当前位置和最大位置相同,说明需要扩容if(buffer.position() == buffer.limit()){ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() << 1);buffer.flip();newBuffer.put(buffer);key.attach(newBuffer);}} catch (IOException ioException) {ioException.printStackTrace();key.cancel(); //注销该事件,从selector集合中删除}}}}}private static void split(ByteBuffer source){source.flip();for(int i = 0;i < source.limit();i++){// 找到一条完整消息if(source.get(i) == '\n'){int length = i + 1 - source.position();// 把这条完整消息存入新的 ByteBufferByteBuffer target = ByteBuffer.allocate(length);//从source读,向target写for(int j = 0; j < length;j++)target.put(source.get());debugAll(target);}}//没有找到说明半包source.compact();}
}
4.4.3.1 ByteBuffer 大小的分配
每个channel都需要记录可能被切分的消息,因为Bytebuffer不能被多个channel共同使用,因此需要为Chanel维护一个独立的ByteBuffer
ByteBuffer不能太大,我们上面只考虑扩容,而Netty还做到了缩容。
- 一种思路是先分配较小的buffer,如果过发现数据不够,在分配更大的。有点事消息连续容易处理,但是会耗费性能
- 可以用数组组成buffer,发现不够,就把多出来的数据写入新的数组,与前面的消息不连续解析比较复杂,优点是避免了拷贝引起的性能损耗
4.4.4 写入内容与写入内容过多
当我们写入数据过多的时候,网络缓冲区会写满,于是就会不断尝试。我们可以通过SelectionKey来判断,如果有空间可以继续写的时候,再继续。而不是不断地尝试。
package com.yjx23332.netty.test;import org.springframework.expression.spel.ast.Selection;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;public class WriteServer {public static void main(String[] args) throws IOException {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);serverSocketChannel.bind(new InetSocketAddress("localhost",8080));while(true){selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while(iter.hasNext()){SelectionKey key = iter.next();iter.remove();if(key.isAcceptable()){SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);SelectionKey scKey = socketChannel.register(selector,0,null);//1. 向客户端发送大量数据StringBuilder stringBuilder = new StringBuilder();for(int i = 0;i < 300000000;i++){stringBuilder.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(stringBuilder.toString());//2. 先写入尝试全部写入channelint write = socketChannel.write(buffer);//输出实际写入多少System.out.println(write);//3. 如果过缓冲区还有剩余内容if(buffer.hasRemaining()){//4. 关注可写事件/*** 避免新的关注把原来的关注覆盖了,我们可以用如下两种方式进行* */scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);//scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE);//5. 将未写完的数据挂在到其中scKey.attach(buffer);}}else if(key.isReadable()) {SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();int write = socketChannel.write(buffer);System.out.println(write);//6. 清理工作if (!buffer.hasRemaining()){key.attach(null);//7. 不再关注可写事件key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);}}}}}
}
package com.yjx23332.netty.test;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;public class Client {public static void main(String[] args) throws IOException {SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress("localhost",8080));//接收数据int count = 0;ByteBuffer buffer = ByteBuffer.allocate(1024*1024);while(true){count += socketChannel.read(buffer);System.out.println(count);buffer.clear();}}
}
4.5 方法
阻塞直到绑定发生
int count = selector.select();
阻塞直到绑定事件发生 ,或者超时
int count = selector.select(long timeout);
不会则色,不管有没有事件发生,都立即返回,自己根据返回值进行判
int count = selector.selectNow();
count是事件数目。
参考文献
[1]黑马程序员Netty全套教程