Java NIO由浅入深(作者原创)

网友投稿 643 2022-05-29

个人简介

NIO三大组件

Java NIO的核心:通道(Channel)和缓冲区(Buffer),通道是用来传输数据的,缓冲区是存储数据的。

常见的Channel有以下四种,其中FileChannel主要用于文件传输,其余三种用于网络通信。

FileChannel

SocketChannel

DatagramChannel

ServerSocketChannel

Buffer有几种,使用最多的是ByteBuffer

ByteBuffer

MappedByteBuffer

DirectByteBuffer

HeapByteBuffer

ShortBuffer

IntBuffer

LongBuffer

FloatBuffer

DoubleBuffer

CharBuffer

8大基本数据类型除了boolean没有Buffer,其余的7种基本类型都有

未使用Selector之前,有如下几种方案

1.多线程技术

实现逻辑 :每一个连接进来都开一个线程去处理Socket。

缺点:

如果同时有100000个(大量)连接进来,系统大概率是挡不住的,而且线程会占用内存,会导致内存不足。

线程需要进行上下文切换,成本高

2.采用线程池技术

实现逻辑 :创建一个固定大小(系统能够承载的线程数)的线程池对象,去处理连接的请求,假如线程池大小为

100个线程数,这时候同时并发连接1000个Socket,此时只有100个Socket会得到处理,其余的会阻塞。这样很好的防止了系统线程数

过多导致线程占用内存大,不容易导致系统由于内存占用的问题而崩溃。

相对于第一种多线程技术处理客户端Socket,第二种方案使用线程池去处理连接会更好,但是还是不够好

缺点:

阻塞模式下,线程仅能处理一个连接,若socket连接一直未断开,则该线程无法处理其他socket。

3.使用Selector选择器

selector的作用就是配合一个线程来管理多个channel,获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,当一个channel中没有执行任务时,可以去执行其他channel中的任务

注意:fileChannel因为是阻塞式的,所以无法使用selector

使用场景:适合连接数多,但流量较少的场景

流程: 假如当前Selector绑定的Channels没有任何一个Channel触发了感兴趣的事件,

则selector的select()方法会阻塞线程,直到channel触发了事件。这些事件发生后,select方法就会返回这些事件交给thread来处理。

区别:

IO是面向流的,NIO是面向缓冲区(块)的

Java IO的各种流是阻塞的,而Java NIO是非阻塞的

Java NIO的选择器允许一个单独的线程来监视多个输入通道

普通io读取文件

@Test public void test01(){ try { FileInputStream fileInputStream = new FileInputStream("data.txt"); long start = System.currentTimeMillis(); byte bytes[]=new byte[1024]; int n=-1; while ((n=fileInputStream.read(bytes,0,1024))!=-1){ String s = new String(bytes,0,n,"utf-8"); System.out.println(s); } long end = System.currentTimeMillis(); System.out.println("普通io共耗时:"+(end-start)+"ms"); } catch (Exception e) { e.printStackTrace(); } }

缓冲流IO读取文件

@Test public void test02(){ try { BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream("data.txt")); long start = System.currentTimeMillis(); byte bytes[]=new byte[1024]; int n=-1; while ((n=bufferedInputStream.read(bytes,0,1024))!=-1){ String s = new String(bytes,0,n,"utf-8"); System.out.println(s); } long end = System.currentTimeMillis(); System.out.println("缓冲流io共耗时:"+(end-start)+"ms"); } catch (Exception e) { e.printStackTrace(); } }

Nio-FileChannel读取文件

//方式1 @Test public void test3(){ try { //获取channel,FileInputStream生成的channel只有读的权利 FileChannel channel = new FileInputStream("data.txt").getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //开辟一块缓冲区 long start = System.currentTimeMillis(); while (true){ //写入操作 int read = channel.read(byteBuffer); //如果read=-1,说明缓存“块”没有数据了 if(read==-1){ break; }else { byteBuffer.flip();//读写切换,切换为读的操作,实质上就是把limit=position,position=0 String de = StandardCharsets.UTF_8.decode(byteBuffer).toString(); System.out.println(de); byteBuffer.clear(); //切换为写 } } long end = System.currentTimeMillis(); System.out.println("heap nio共耗时:"+(end-start)+"ms"); } catch (Exception e) { e.printStackTrace(); } } //方式2 @Test public void test4(){ ByteBuffer byteBuffer = ByteBuffer.allocate(10); byteBuffer.put("helloWorld".getBytes()); debugAll(byteBuffer); byteBuffer.flip(); //读模式 while (byteBuffer.hasRemaining()){ System.out.println((char)byteBuffer.get()); } byteBuffer.flip(); System.out.println(StandardCharsets.UTF_8.decode(byteBuffer).toString()); }

创建ByteBuffer缓冲区:

ByteBuffer.allocate(int capacity)

ByteBuffer.allocateDirect(int capacity)

ByteBuffer.wrap(byte[] array,int offset, int length)

ByteBuffer常用方法:

get()

get(int index)

put(byte b)

put(byte[] src)

limit(int newLimit)

mark()

reset()

clear()

flip()

compact()

字符串转换成ByteBuffer

ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("hello world\nabc\n\baaa");

Java NIO由浅入深(作者原创)

ByteBuffer转换成String

String str = StandardCharsets.UTF_8.decode(byteBuffer).toString();

整个Demo

@Test public void test5(){ //字符串转换成ByteBuffer ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("hello world\nabc\n\baaa"); //通过StandardCharsets的encode方法获得ByteBuffer,此时获得的ByteBuffer为读模式,无需通过flip切换模式 // byteBuffer.flip(); //这句话不能加,encode转换成ByteBuffer默认是读模式 while (byteBuffer.hasRemaining()){ System.out.printf("%c",(char)byteBuffer.get()); } byteBuffer.flip(); //ByteBuffer转换成String String str = StandardCharsets.UTF_8.decode(byteBuffer).toString(); System.out.println("\n--------------"); System.out.println(str); }

@Test public void test6(){ String msg = "hello,world\nI'm abc\nHo"; ByteBuffer byteBuffer = ByteBuffer.allocate(32); byteBuffer.put(msg.getBytes()); byteBuffer=splitGetBuffer(byteBuffer); byteBuffer.put("w are you?\n".getBytes()); //多段发送数据 byteBuffer=splitGetBuffer(byteBuffer); byteBuffer.put("aa bccdd?\n".getBytes()); //多段发送数据 byteBuffer=splitGetBuffer(byteBuffer); } private ByteBuffer splitGetBuffer(ByteBuffer byteBuffer) { byteBuffer.flip(); StringBuilder stringBuilder = new StringBuilder(); int index=-1; for (int i = 0; i < byteBuffer.limit(); i++) { if(byteBuffer.get(i)!='\n'){ //get(i)不会让position+1 stringBuilder.append((char) byteBuffer.get(i)); }else{ index=i; //记录最后一个分隔符下标 String data = stringBuilder.toString(); ByteBuffer dataBuf = ByteBuffer.allocate(data.length()); dataBuf.put(data.getBytes()); dataBuf.flip(); debugAll(dataBuf); dataBuf.clear(); stringBuilder=new StringBuilder(); } } ++index; ByteBuffer temp = ByteBuffer.allocate(byteBuffer.capacity()); for (;index

文件编程

因为FileChannel只能工作在阻塞环境下,而Selector是非阻塞的,所以FileChannel无法注册到Selector里面去。

FileChannel不能直接打开,一定要用FileInputStream或者FileOutputStream或者RandomAccessFile来获取FileChannel对象,

使用getChannel方法即可。

注意以下几点:

通过FileInputStream获取的channel只能读

通过FileOutputStream获取的channel只能写

通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定

通过 FileInputStream 获取channel,通过read方法将数据写入到ByteBuffer中,read方法的返回值表示读到了多少字节,若读到了文件末尾则返回-1

int read = channel.read(buffer);

因为channel也是有大小的,所以 write方法并不能保证一次将 buffer中的内容全部写入channel。必须需要按照以下规则进行写入

// 通过hasRemaining()方法查看缓冲区中是否还有数据未写入到通道中 while(buffer.hasRemaining()) { channel.write(buffer); }

操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘,而是等到缓存满了以后将所有数据一次性的写入磁盘。可以调用force(true)方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

//方法一: FileInputStream fileInputStream = new FileInputStream("data.txt"); //读的通道 FileChannel from = fileInputStream.getChannel(); FileOutputStream fileInputStream1 = new FileOutputStream("to.txt"); //写的通道 FileChannel to = fileInputStream1.getChannel(); long l = from.transferTo(0, from.size(), to); //方法二: RandomAccessFile r1 = new RandomAccessFile("data.txt", "rw"); //都开启rw权限 FileChannel from1 = r1.getChannel(); RandomAccessFile r2 = new RandomAccessFile("to.txt", "rw"); FileChannel to2 = r2.getChannel(); from1.transferTo(0,r1.length(),to2);

使用transferTo方法可以快速、高效地将一个channel中的数据传输到另一个channel中,但一次只能传输2G的内容,

transferTo方法的底层使用了零拷贝技术,

Path用来表示文件路径

Paths是工具类,用来获取Path实例

Path path = Paths.get("data.txt"); Path path1 = Paths.get("D:\\java code\\netty-study\\data.txt");

Path path = Paths.get("data.txt"); boolean exists = Files.exists(path);

createDirectory(path)

如果文件夹已存在,则会报错。FileAlreadyExistsException,

此方法只能创建一级目录,如果用此方法创建多级目录则会报错NoSuchFileException。

Path path = Paths.get("D:\\img"); Path directory = Files.createDirectory(path);

createDirectories(path)

Path path = Paths.get("D:\\img\\a\\b"); Path directories = Files.createDirectories(path);

//这种方式如果目标文件‘to’存在则会报错FileAlreadyExistsException Path from = Paths.get("data.txt"); Path to = Paths.get("D:\\img\\target.txt"); //文件名也要写 Files.copy(from,to); //只需要加StandardCopyOption.REPLACE_EXISTING就不会报错,因为它会直接替换掉目标文件 Path from = Paths.get("data.txt"); Path path = Paths.get("D:\\img\\target.txt"); //文件名也要写 Files.copy(from,path, StandardCopyOption.REPLACE_EXISTING);

Path source = Paths.get("data.txt"); Path target = Paths.get("D:\\img\\target.txt"); Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);

StandardCopyOption.ATOMIC_MOVE保证文件移动的原子性

Path target = Paths.get("D:\\img\\target.txt"); Files.delete(target); //删除文件

walkFileTree(Path, FileVisitor)方法

Path:文件起始路径

FileVisitor:文件访问器,使用访问者模式,这个接口有如下方法

preVisitDirectory:访问目录前的操作

visitFile:访问文件的操作

visitFileFailed:访问文件失败时的操作

postVisitDirectory:访问目录后的操作

Path target = Paths.get("D:\\cTest"); Files.walkFileTree(target,new SimpleFileVisitor(){ @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { System.out.println("1:"+dir); return super.preVisitDirectory(dir, attrs); } @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { System.out.println("2:"+file); return super.visitFile(file, attrs); } });

网络编程

这里有一段简易的通信代码:

服务器端:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //打开serverSocketChannel serverSocketChannel.bind(new InetSocketAddress(8080)); while (true){ System.out.println("waiting....."); SocketChannel socketChannel = serverSocketChannel.accept(); //阻塞 System.out.println("connect success"); ByteBuffer byteBuffer = ByteBuffer.allocate(100); socketChannel.read(byteBuffer); //阻塞,等待消息发送过来即可封装到缓存里去 byteBuffer.flip(); System.out.println(StandardCharsets.UTF_8.decode(byteBuffer).toString()); }

客户端:

SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress( 8080)); ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("this is nio"); socketChannel.write(byteBuffer);

实际上,这个和以前的IO+Socket进行通信是一样的,都是属于阻塞状态。

configureBlocking(false)

可以通过ServerSocketChannel的configureBlocking(false)方法将获得连接设置为非阻塞的。此时若没有连接,accept会返回null,

可以通过SocketChannel的configureBlocking(false)方法将从通道中读取数据设置为非阻塞的。若此时通道中没有数据可读,read会返回-1

服务器端:

ByteBuffer byteBuffer = ByteBuffer.allocate(100); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //打开通道 serverSocketChannel.bind(new InetSocketAddress(8082)); //由于accept方法是阻塞的,我们只需要一行代码就能让它变成非阻塞的 //开启非阻塞的之后accept方法如果没有连接到客户端就会从阻塞变成返回'null' serverSocketChannel.configureBlocking(false);//开启非阻塞 while (true){ // System.out.println("waiting..."); SocketChannel socketChannel = serverSocketChannel.accept(); //阻塞方法 // System.out.println(socketChannel); if(socketChannel!=null){ System.out.println("等待读取"); socketChannel.configureBlocking(false); //设置SocketChannel为非阻塞 int read = socketChannel.read(byteBuffer);//阻塞方法 System.out.println("读取到"+read+"字节"); if(read>0){ byteBuffer.flip(); System.out.println(StandardCharsets.UTF_8.decode(byteBuffer).toString()); } } }

客户端:

SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8082)); ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("hello"); socketChannel.write(byteBuffer);

Selector是基于事件驱动的

单线程可以配合Selector完成对多个Channel读写事件的监控,这称之为多路复用。

注意:

多路复用只能用于网络IO上,文件IO由于只能处于阻塞环境下才能进行,所以无法多路复用

如果不用Selector的非阻塞模式,线程大部分时间都在做无用功,而Selector能够保证以下几点

有可连接事件时才去连接

有可读事件才去读取

有可写事件才去写入

进入SelectionKey这个类可以看到:

public static final int OP_READ = 1 << 0; //read事件 public static final int OP_WRITE = 1 << 2; //write事件 public static final int OP_CONNECT = 1 << 3; //connect事件 public static final int OP_ACCEPT = 1 << 4; //accept事件

select()

select方法会一直阻塞直到绑定事件发生

服务器端:

Selector selector = Selector.open(); // 创建选择器 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8081)); serverSocketChannel.configureBlocking(false); // 通道必须是非阻塞的 serverSocketChannel.register( selector, SelectionKey.OP_ACCEPT); // 把channel注册到selector,并选择accept事件 for (; ; ) { selector.select(); // 选择事件,此时会阻塞,当事件发生时会自动解除阻塞 System.out.println("begin"); // 遍历事件发生的集合,获取对应事件 selector .selectedKeys() .forEach( selectionKey -> { if (selectionKey.isAcceptable()) { try { SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("已连接"); // 处理完之后记得在发生事件的集合中移除该事件 selector.selectedKeys().remove(selectionKey); } catch (IOException e) { e.printStackTrace(); } } }); }

原生NIO是真tmd难用,恶心

当accept事件处理之后立刻设置read事件,但不处理read事件,因为用户可能只是连接,但是没有写数据,所以要基于事件触发

别忘了accept事件处理之后要设置为非阻塞模式configureBlocking(false)

Selector selector = Selector.open(); // 创建选择器 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8081)); serverSocketChannel.configureBlocking(false); // 通道必须是非阻塞的 serverSocketChannel.register( selector, SelectionKey.OP_ACCEPT); // 把channel注册到selector,并选择accept事件 try { while (true) { int count = selector.select(); // 选择事件,此时会阻塞,当事件发生时会自动解除阻塞 Set selectionKeys = selector.selectedKeys(); Iterator iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); if (selectionKey.isAcceptable()) { // 处理accept事件 try { ServerSocketChannel serverSocket = (ServerSocketChannel) selectionKey.channel(); System.out.println("已连接"); SocketChannel socketChannel = serverSocket.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); // 读事件 iterator.remove(); } catch (IOException e) { } } else if (selectionKey.isReadable()) { // 处理read事件 // 获取socketChannel,实际上这个channel就是上面注册进selector的对象 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(100); try{ int read = socketChannel.read(byteBuffer); System.out.println("read:"+read); }catch (Exception e){ // e.printStackTrace(); continue; //一定要这样写。。。。。。。防止多次read报错 } byteBuffer.flip(); debugAll(byteBuffer); byteBuffer.clear(); iterator.remove(); } } } } catch (Exception e) { e.printStackTrace(); }

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发

事件处理之后一定要把selector.selectedKeys这个集合中当前处理完成的事件remove掉

零拷贝指的是数据无需拷贝到JVM内存中,同时具有以下三个优点:

更少的用户态与内核态的切换

不利用cpu计算,减少cpu缓存伪共享

零拷贝适合小文件传输

使用DirectByteBuffer

ByteBuffer.allocate(10)底层对应 HeapByteBuffer,使用的还是Java堆内存

ByteBuffer.allocateDirect(10)底层对应DirectByteBuffer,使用的是操作系统内存,不过需要手动释放内存

优点:

减少了一次数据拷贝,用户态与内核态的切换次数没有减少

这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写

Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU

只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗

使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 CPU

整个过程仅只发生了1次用户态与内核态的切换,数据拷贝了 2 次

Java 任务调度

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:一文教你快速了解并安装IntelliJ IDEA及其目录介绍
下一篇:安装Redis就那么几步,很简单!
相关文章