网络-java中AIO(异步非阻塞)理解与应用

AIO(Asynchronous IO/异步非阻塞IO)概述:

是NIO的2.0版本,AIO是jdk1.7之后提供的,需要借助于本地操作系统(unix/liunx提供的异步IO模型)。
AIO最大的一个特性就是异步能力,这种能力对socket与文件IO都起作用。AIO其实是一种在读写操作结束之前允许进行其它操作的IO处理,AIO是对jdk1.4中提出的同步非阻塞NIO的进一步增强。

AIO对应IO模型

《网络-java中AIO(异步非阻塞)理解与应用》

jdk1.7主要增加了四个新的异步通道:
  • AsynchronousFileChannel: 用于文件异步读写
  • AsynchronousSocketChannl:客户端异步socket
  • AsynchronousServerSocketChannel: 服务器异步socket
  • AsynchronousDatagramChannel: UDP异步socket
    AIO的实施需要充分使用OS(网络)参与,IO操作需要操作系统支持,并发也同样需要操作系统的支持,所以性能方面不同操作系统差异比较明显。
    当进行读写操作时,只需调用API的read/write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序; 对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。可以理解为 read/write方法都是异步的,完成后会主动调用回调函数

AIO的特点

  • 读完了在通知我
  • 不会加快IO,只是在读完后进行通知
  • 使用回调函数,进行业务处理。

异步的处理:

异步就是通知系统做一件事情。然后忘掉它,自己去做其它事情了。很多时候系统做完某一件事情后需要一些后续的操作。那么我们就需要告诉异步调用如何做后续处理。通常有两种方式:

  • 将来式:将你希望主线程发起异步调用,并轮询等待结果的时候使用将来式;
  • 回调式:异步回调函数。
    以文件读取为例:
将来式(Future):

《网络-java中AIO(异步非阻塞)理解与应用》

Future异步读取:

Future用现有的java.util.concurrent包中的Future用来保存异步操作的处理结果。通常Future.get()方法在异步IO操作完成时获取其结果。
AsynchronousFileChannel会关联线程池,它的任务是接收IO处理事件,并分发给负责处理通道中IO操作的处理器。跟通道中发起的IO操作关联的结果处理器确保是由线程池中某个线程产生。

future举例:
package Internet.AIODemo;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class AIOFile implements Runnable{ 
    String src;
    public AIOFile(String src){ 
        this.src=src;
    }
    @Override
    public void run() { 
        Path path= Paths.get(src);
        try { 
            AsynchronousFileChannel channel=AsynchronousFileChannel.open(path);
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            Future<Integer> future=channel.read(buffer,0);
            Integer readNumber=future.get();
            buffer.flip();
            byte [] bytes=new byte[buffer.remaining()];
            buffer.get(bytes);
            System.out.println(new String(bytes,"UTF-8"));
        } catch (IOException e) { 
            e.printStackTrace();
        } catch (InterruptedException e) { 
            e.printStackTrace();
        } catch (ExecutionException e) { 
            e.printStackTrace();
        }
    }
}
public class AIOFileChan { 
    public static void main(String[] args) { 
        Executor executors=Executors.newSingleThreadExecutor();
        executors.execute(new AIOFile("E:\\javaSource2\\javacod\\20181121\\fos.txt"));
    }
}

回调式:

《网络-java中AIO(异步非阻塞)理解与应用》

回调式异步读取:

回调式所采用的事件处理技术类似于Swing UI编程采用的机制。基本思想是主线程会派一个侦察员CompletionHandler到独立的线程中执行IO成操作。这个侦察员将带着IO操作的结果返回到主线程中,这个结果会触发它自己的Completed或者failed方法(需要重写)。在异步IO结束后,接口java.nio.channels.CompletionHandler会被调用,其中V是结果类型,A是提供结果的附着对象。 此时必须已经有该接口completed(V,A)和failed(V,A) 方法的实现,你的程序才能知道异步IO操作成功或失败该如何处理。

回调式举例:
package Internet.AIODemo;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class AIOFile implements Runnable{ 
    String src;
    public AIOFile(String src){ 
        this.src=src;
    }
    @Override
    public void run() { 
        Path path= Paths.get(src);
        try { 
            AsynchronousFileChannel channel=AsynchronousFileChannel.open(path);
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            channel.read(buffer,0,buffer, new CompletionHandler<Integer, ByteBuffer>() { 
                @Override
                public void completed(Integer result, ByteBuffer attachment) { 
                     attachment.flip();
                     byte[] bytes=new byte[attachment.remaining()];
                     attachment.get(bytes);
                    try { 
                        System.out.println(new String(bytes,"UTF-8"));
                    } catch (UnsupportedEncodingException e) { 
                        e.printStackTrace();
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) { 
                        exc.printStackTrace();
                }
            });

        } catch (IOException e) { 
            e.printStackTrace();
        }
        //线程结束,回调函数就会失效,防止回调函数失效
        while(true){ 
            try { 
                Thread.sleep(100);
            } catch (InterruptedException e) { 
                e.printStackTrace();
            }
        }
    }
}
public class AIOFileChan { 
    public static void main(String[] args) { 
        Executor executors=Executors.newSingleThreadExecutor();
        executors.execute(new AIOFile("E:\\javaSource2\\javacod\\20181121\\fos.txt"));
    }
}

AIO网络通信:

  • AsynchronousServerSocketChannel:AIO网络通信服务端Socket
accept方法:

AsynchronousServerSocketChannel创建成功后,类似于ServerSocket也是调用accept方法来接收客户端的连接。
由于异步IO实际上的IO操作是交给操作系统来做的,用户进程只负责通知操作系统进行IO和接收操作系统IO完成的通知。
所以异步的ServerChannel调用accept方法后,当前线程不会阻塞,程序也不知道accept()方法什么时候才能够接收到客户端请求并且操作系统完成网络IO。
为解决这个问题,AIO为accept方法提供两个版本:

  • Future< AsynchronousSocketChannel> accept:
    开始接收来自客户端请求,如果当前线程需要进行网络IO(即获得AsynchronousSocketChannel) ,则应该调用该方法返回的Future对象的get()方法,但是get() 方法会阻塞该线程,所以这种方式是阻塞式的异步IO。
  • < A > void accept(A attachent,CompletionHandle< AsynchronousSocketChannel , ? super A> handler ):
    开始接收来自客户端请求,连接成功或失败都会触发CompletionHandler对象的响应方法。
    其中AsynchronousSocketChannel就代表该CompletionHandler处理器在处理连接成功时的result是AsynchronousSocketChannel的实例。
    而CompletionHandler接口中定义了两个方法,
  • completed(V result , A attachment): 当IO完成时触发该方法,该方法的第一个参数代表IO操作返回的对象,
    第二个参数代表发起IO操作时传入的附加参数。
  • faild(Throwable exc, A attachment): 当IO失败时触发该方法,第一个参数代表IO操作失败引发的异常或错误。

BIO、NIO、AIO应用场景

  • BIO适合并发量固定的业务请求
  • NIO适合并发量高,业务逻辑简单(轻量级)的场景,如聊天
  • AIO适合并发量高,业务逻辑复杂(重量级)的场景。

AIO网络编程实例:

客户端:
  • 客户端启动类:
package Internet.AIODemo;

public class AIOClient { 
    public static void main(String[] args) { 
        new Thread(new ClientHandler("127.0.0.1",8888)).start();
    }
}

  • 客户端业务逻辑类:
package Internet.AIODemo;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;

public class ClientHandler implements Runnable{ 
    private AsynchronousSocketChannel clientChannel;
    private String host;
    private int port;
    private CountDownLatch latch;
    public ClientHandler(String host,int port){ 
        this.host=host;
        this.port=port;
        try { 
            this.clientChannel=AsynchronousSocketChannel.open();

        } catch (IOException e) { 
            e.printStackTrace();
        }
    }

    @Override
    public void run() { 
        //创建CountDownLatch等待
        this.latch=new CountDownLatch(1);
        //异步操作,连接服务端
        clientChannel.connect(new InetSocketAddress(host,port),//地址传入
                this, //给回调函数传入的数据,也是回调函数第二个参数类型
                new ConnectHandler());
        Scanner scannel=new Scanner(System.in);
        String msg=null;
        while(true){ 
            msg=scannel.nextLine();
            if("exit".equals(msg))
                break;
            sendMsg(msg);
        }
        try { 
            clientChannel.close();
        } catch (IOException e) { 
            e.printStackTrace();
        }
    }
    public void sendMsg(String msg){ 
        byte [] bytes=msg.getBytes();
        ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);
        System.out.println("msg:"+msg+",length:"+bytes.length);
        writeBuffer.put(bytes);
        writeBuffer.flip();//读写切换
        //异步写
        clientChannel.write(writeBuffer, //要写的参数
                writeBuffer,//往回调函数传入的参数
                new WriteHandler(clientChannel));
        ByteBuffer readBuffer=ByteBuffer.allocate(1024);
        //也可以在write里面的else中使用read
        clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel));
    }
    //连接类
    class ConnectHandler implements CompletionHandler<Void,//connect操作不需要返回
            ClientHandler>{  //传入参数类型
        @Override
        public void completed(Void result, ClientHandler attachment) { 
            System.out.println("服务器连接成功");
        }

        @Override
        public void failed(Throwable exc, ClientHandler attachment) { 
            exc.printStackTrace();
            try { 
                attachment.clientChannel.close();
            } catch (IOException e) { 
                e.printStackTrace();
            }
        }
    }

    class WriteHandler implements CompletionHandler<Integer,ByteBuffer>{ 
        private AsynchronousSocketChannel channel;
        public WriteHandler(AsynchronousSocketChannel channel){ 
            this.channel=channel;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) { 
            //完成全部数据写入
            if(attachment.hasRemaining()){ 
                channel.write(attachment,attachment,this);
         }//else{ 
 //读取数据
                 //这里的代码可以用,可以不用创建新的buffer。
// ByteBuffer buffer=ByteBuffer.allocate(1024);
// channel.read(buffer,buffer,new ReadHandler(channel));
// }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) { 
               exc.printStackTrace();
            try { 
                channel.close();
            } catch (IOException e) { 
                e.printStackTrace();
            }
        }
    }
    class ReadHandler implements CompletionHandler<Integer,ByteBuffer>{ 
        private AsynchronousSocketChannel channel;
        public ReadHandler(AsynchronousSocketChannel channel){ 
            this.channel=channel;
        }
        @Override
        public void completed(Integer result, ByteBuffer attachment) { 
// attachment.flip();//内核完成向缓存写入
            byte[] bytes=new byte[attachment.remaining()];
            attachment.get(bytes);
            String body;
            try { 
                body = new String(bytes, "UTF-8");
                System.out.println("客户端收到结果: "+body);
            }catch (UnsupportedEncodingException e){ 
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) { 
            System.out.println("数据读取失败");
            try { 
                channel.close();
            } catch (IOException e) { 
                e.printStackTrace();
            }

        }
    }
}

服务端:
  • 服务端启动类:
package Internet.AIODemo;

public class AIOServer { 
    private static int  DEFAULT_PORT=12345;
    private static ServerHandler serverHandler;
    private volatile static long clientCount=0;
    public static synchronized void start(int port){ 
        if(serverHandler!=null){ 
            return;
        }
        serverHandler=new ServerHandler(port);
        new Thread(serverHandler,"Server").start();
    }

    public static void main(String[] args) { 
        AIOServer.start(8888);
    }
}

  • 服务端业务逻辑类:
package Internet.AIODemo;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;

public class ServerHandler implements Runnable{ 
    private AsynchronousServerSocketChannel channel;
    public ServerHandler(int port){ 
        try { 
            channel=AsynchronousServerSocketChannel.open();
            channel.bind(new InetSocketAddress(port));
            System.out.println("服务端已启动,端口号:"+port);
        } catch (IOException e) { 
            e.printStackTrace();
        }
    }
    @Override
    public void run() { 
        //channel.accept(this,new AcceptHandler());
        //Future<AsynchronousSocketChannel> accept=channel.accept();

        //该步操作为异步操作,防止当前线程直接执行结束
        //方案一:while(true)+sleep
        //因为我们将accept操作交给回调函数执行,如果主流程没有其它业务
        //那么线程就会结束,这样回调函数就会出现问题。不结束那么回调函数就还有效
// while(true){ 
// try { 
// Thread.sleep(1000);
// } catch (InterruptedException e) { 
// e.printStackTrace();
// }
// }

        //方案二 :CountDownLatch作用:在完成一组正在执行的操作之前,允许当前
        //的线程一直阻塞此处,让线程在此阻塞,防止服务端执行完成后退出。
        CountDownLatch count=new CountDownLatch(1);
        channel.accept(this,new AcceptHandler());
        try { 
            count.await();
        } catch (InterruptedException e) { 
            e.printStackTrace();
        }

    }
    //CompletionHandler<V,A>
    //V-IO操作的结果,这里标识返回的是成功建立的连接,AsynchronousSocketChannel
    //A-IO操作附件,这里传入synchronousServerSocketChannel 便于继续接收请求。
    class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,ServerHandler>{ 
        @Override
        public void completed(AsynchronousSocketChannel channel, ServerHandler serverHandler) { 
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            //异步读,第三个参数为接收消息回调的业务handle
            channel.read(buffer,//读事件需要将数据写入这个buffer
                    buffer,//给回调函数传入的参数,
                    new ReadHandler(channel));
            //继续接收其它客户端的请求。
            //第一个参数依然需要传serverHandler,如果传null时,多个客户端连接会抛异常,但是不影响连接
            serverHandler.channel.accept(serverHandler,this);
        }

        @Override
        public void failed(Throwable exc, ServerHandler attachment) { 
            exc.printStackTrace();
        }
    }
    class ReadHandler implements  CompletionHandler<Integer //声明read操作返回的类型(读数据个数)
            ,ByteBuffer>{ //调用读操作传入的类型,针对read函数第一个buffer
        //用户读取信息或者发送信息的channel
        private AsynchronousSocketChannel channel;
        public ReadHandler(AsynchronousSocketChannel channel){ 
            this.channel=channel;
        }
        @Override
        public void completed(Integer result, ByteBuffer attachment) { 
            attachment.flip();//内核已经帮我们把数据写到buffer,
            // 现在进行读写切换将数据从buffer中读出来
            byte[] msg=new byte[attachment.remaining()];
            attachment.get(msg);
            try { 
                String expression=new String(msg,"UTF-8");
                if("exit".equals(expression)){ 
                    //channel.close();
                    throw new IOException("关闭服务端");
                }
                System.out.println("服务器收到消息:"+expression);
                //向客户端发送消息
                doWrite(expression);
            } catch (UnsupportedEncodingException e) { 
                e.printStackTrace();
            } catch (IOException e) { 
                e.printStackTrace();
            }
        }
        private void doWrite(String msg){ 
            byte[] bytes=msg.getBytes();
            ByteBuffer buffer=ByteBuffer.allocate(bytes.length);
            System.out.println("buffer.length:"+bytes.length);
            buffer.put(bytes);
            buffer.flip();
            //异步写数据
            channel.write(buffer, //从哪里拿数据
                    buffer, //回调函数传入的参数
                    //回调函数
                    new CompletionHandler<Integer, //写完成后返回的参数类型
                            ByteBuffer>() {  //往回调函数传入的参数类型
                            // 因为第二个参数传入的是ByteBuffer类型
                @Override
                public void completed(Integer result, ByteBuffer attachment) { 
                    //如果没有发送完则继续发送。
                    if(attachment.hasRemaining()){ 
                        channel.write(attachment,attachment,this);
                    }else{ 
                        //异步读,
                        ByteBuffer allocate=ByteBuffer.allocate(1024);
                        //attachment.clear();
                        attachment.clear();
                        //也可以直接传入attachment
                        channel.read(allocate,allocate,new ReadHandler(channel));
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) { 
                    exc.printStackTrace();
                }
            });
        }
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) { 
            exc.printStackTrace();
            try { 
                channel.close();
            } catch (IOException e) { 
                e.printStackTrace();
            }
        }
    }
}
    原文作者:Fly_Fly_Zhang
    原文地址: https://blog.csdn.net/Fly_Fly_Zhang/article/details/91047678
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞