AIO(Asynchronous IO/异步非阻塞IO)概述:
是NIO的2.0版本,AIO是jdk1.7之后提供的,需要借助于本地操作系统(unix/liunx提供的异步IO模型)。
AIO最大的一个特性就是异步能力,这种能力对socket与文件IO都起作用。AIO其实是一种在读写操作结束之前允许进行其它操作的IO处理,AIO是对jdk1.4中提出的同步非阻塞NIO的进一步增强。
AIO对应IO模型
jdk1.7主要增加了四个新的异步通道:
- AsynchronousFileChannel: 用于文件异步读写
- AsynchronousSocketChannl:客户端异步socket
- AsynchronousServerSocketChannel: 服务器异步socket
- AsynchronousDatagramChannel: UDP异步socket
AIO的实施需要充分使用OS(网络)参与,IO操作需要操作系统支持,并发也同样需要操作系统的支持,所以性能方面不同操作系统差异比较明显。
当进行读写操作时,只需调用API的read/write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序; 对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。可以理解为 read/write方法都是异步的,完成后会主动调用回调函数 。
AIO的特点
- 读完了在通知我
- 不会加快IO,只是在读完后进行通知
- 使用回调函数,进行业务处理。
异步的处理:
异步就是通知系统做一件事情。然后忘掉它,自己去做其它事情了。很多时候系统做完某一件事情后需要一些后续的操作。那么我们就需要告诉异步调用如何做后续处理。通常有两种方式:
- 将来式:将你希望主线程发起异步调用,并轮询等待结果的时候使用将来式;
- 回调式:异步回调函数。
以文件读取为例:
将来式(Future):
Future异步读取:
Future用现有的java.util.concurrent包中的Future用来保存异步操作的处理结果。通常Future.get()方法在异步IO操作完成时获取其结果。
AsynchronousFileChannel会关联线程池,它的任务是接收IO处理事件,并分发给负责处理通道中IO操作的处理器。跟通道中发起的IO操作关联的结果处理器确保是由线程池中某个线程产生。
future举例:
package Internet.AIODemo;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
class AIOFile implements Runnable{
String src;
public AIOFile(String src){
this.src=src;
}
@Override
public void run() {
Path path= Paths.get(src);
try {
AsynchronousFileChannel channel=AsynchronousFileChannel.open(path);
ByteBuffer buffer=ByteBuffer.allocate(1024);
Future<Integer> future=channel.read(buffer,0);
Integer readNumber=future.get();
buffer.flip();
byte [] bytes=new byte[buffer.remaining()];
buffer.get(bytes);
System.out.println(new String(bytes,"UTF-8"));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public class AIOFileChan {
public static void main(String[] args) {
Executor executors=Executors.newSingleThreadExecutor();
executors.execute(new AIOFile("E:\\javaSource2\\javacod\\20181121\\fos.txt"));
}
}
回调式:
回调式异步读取:
回调式所采用的事件处理技术类似于Swing UI编程采用的机制。基本思想是主线程会派一个侦察员CompletionHandler到独立的线程中执行IO成操作。这个侦察员将带着IO操作的结果返回到主线程中,这个结果会触发它自己的Completed或者failed方法(需要重写)。在异步IO结束后,接口java.nio.channels.CompletionHandler会被调用,其中V是结果类型,A是提供结果的附着对象。 此时必须已经有该接口completed(V,A)和failed(V,A) 方法的实现,你的程序才能知道异步IO操作成功或失败该如何处理。
回调式举例:
package Internet.AIODemo;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
class AIOFile implements Runnable{
String src;
public AIOFile(String src){
this.src=src;
}
@Override
public void run() {
Path path= Paths.get(src);
try {
AsynchronousFileChannel channel=AsynchronousFileChannel.open(path);
ByteBuffer buffer=ByteBuffer.allocate(1024);
channel.read(buffer,0,buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] bytes=new byte[attachment.remaining()];
attachment.get(bytes);
try {
System.out.println(new String(bytes,"UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
//线程结束,回调函数就会失效,防止回调函数失效
while(true){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class AIOFileChan {
public static void main(String[] args) {
Executor executors=Executors.newSingleThreadExecutor();
executors.execute(new AIOFile("E:\\javaSource2\\javacod\\20181121\\fos.txt"));
}
}
AIO网络通信:
- AsynchronousServerSocketChannel:AIO网络通信服务端Socket
accept方法:
AsynchronousServerSocketChannel创建成功后,类似于ServerSocket也是调用accept方法来接收客户端的连接。
由于异步IO实际上的IO操作是交给操作系统来做的,用户进程只负责通知操作系统进行IO和接收操作系统IO完成的通知。
所以异步的ServerChannel调用accept方法后,当前线程不会阻塞,程序也不知道accept()方法什么时候才能够接收到客户端请求并且操作系统完成网络IO。
为解决这个问题,AIO为accept方法提供两个版本:
- Future< AsynchronousSocketChannel> accept:
开始接收来自客户端请求,如果当前线程需要进行网络IO(即获得AsynchronousSocketChannel) ,则应该调用该方法返回的Future对象的get()方法,但是get() 方法会阻塞该线程,所以这种方式是阻塞式的异步IO。 - < A > void accept(A attachent,CompletionHandle< AsynchronousSocketChannel , ? super A> handler ):
开始接收来自客户端请求,连接成功或失败都会触发CompletionHandler对象的响应方法。
其中AsynchronousSocketChannel就代表该CompletionHandler处理器在处理连接成功时的result是AsynchronousSocketChannel的实例。
而CompletionHandler接口中定义了两个方法, - completed(V result , A attachment): 当IO完成时触发该方法,该方法的第一个参数代表IO操作返回的对象,
第二个参数代表发起IO操作时传入的附加参数。 - faild(Throwable exc, A attachment): 当IO失败时触发该方法,第一个参数代表IO操作失败引发的异常或错误。
BIO、NIO、AIO应用场景
- BIO适合并发量固定的业务请求
- NIO适合并发量高,业务逻辑简单(轻量级)的场景,如聊天
- AIO适合并发量高,业务逻辑复杂(重量级)的场景。
AIO网络编程实例:
客户端:
- 客户端启动类:
package Internet.AIODemo;
public class AIOClient {
public static void main(String[] args) {
new Thread(new ClientHandler("127.0.0.1",8888)).start();
}
}
- 客户端业务逻辑类:
package Internet.AIODemo;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
public class ClientHandler implements Runnable{
private AsynchronousSocketChannel clientChannel;
private String host;
private int port;
private CountDownLatch latch;
public ClientHandler(String host,int port){
this.host=host;
this.port=port;
try {
this.clientChannel=AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//创建CountDownLatch等待
this.latch=new CountDownLatch(1);
//异步操作,连接服务端
clientChannel.connect(new InetSocketAddress(host,port),//地址传入
this, //给回调函数传入的数据,也是回调函数第二个参数类型
new ConnectHandler());
Scanner scannel=new Scanner(System.in);
String msg=null;
while(true){
msg=scannel.nextLine();
if("exit".equals(msg))
break;
sendMsg(msg);
}
try {
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void sendMsg(String msg){
byte [] bytes=msg.getBytes();
ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);
System.out.println("msg:"+msg+",length:"+bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();//读写切换
//异步写
clientChannel.write(writeBuffer, //要写的参数
writeBuffer,//往回调函数传入的参数
new WriteHandler(clientChannel));
ByteBuffer readBuffer=ByteBuffer.allocate(1024);
//也可以在write里面的else中使用read
clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel));
}
//连接类
class ConnectHandler implements CompletionHandler<Void,//connect操作不需要返回
ClientHandler>{ //传入参数类型
@Override
public void completed(Void result, ClientHandler attachment) {
System.out.println("服务器连接成功");
}
@Override
public void failed(Throwable exc, ClientHandler attachment) {
exc.printStackTrace();
try {
attachment.clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class WriteHandler implements CompletionHandler<Integer,ByteBuffer>{
private AsynchronousSocketChannel channel;
public WriteHandler(AsynchronousSocketChannel channel){
this.channel=channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
//完成全部数据写入
if(attachment.hasRemaining()){
channel.write(attachment,attachment,this);
}//else{
//读取数据
//这里的代码可以用,可以不用创建新的buffer。
// ByteBuffer buffer=ByteBuffer.allocate(1024);
// channel.read(buffer,buffer,new ReadHandler(channel));
// }
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class ReadHandler implements CompletionHandler<Integer,ByteBuffer>{
private AsynchronousSocketChannel channel;
public ReadHandler(AsynchronousSocketChannel channel){
this.channel=channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
// attachment.flip();//内核完成向缓存写入
byte[] bytes=new byte[attachment.remaining()];
attachment.get(bytes);
String body;
try {
body = new String(bytes, "UTF-8");
System.out.println("客户端收到结果: "+body);
}catch (UnsupportedEncodingException e){
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("数据读取失败");
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
服务端:
- 服务端启动类:
package Internet.AIODemo;
public class AIOServer {
private static int DEFAULT_PORT=12345;
private static ServerHandler serverHandler;
private volatile static long clientCount=0;
public static synchronized void start(int port){
if(serverHandler!=null){
return;
}
serverHandler=new ServerHandler(port);
new Thread(serverHandler,"Server").start();
}
public static void main(String[] args) {
AIOServer.start(8888);
}
}
- 服务端业务逻辑类:
package Internet.AIODemo;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
public class ServerHandler implements Runnable{
private AsynchronousServerSocketChannel channel;
public ServerHandler(int port){
try {
channel=AsynchronousServerSocketChannel.open();
channel.bind(new InetSocketAddress(port));
System.out.println("服务端已启动,端口号:"+port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//channel.accept(this,new AcceptHandler());
//Future<AsynchronousSocketChannel> accept=channel.accept();
//该步操作为异步操作,防止当前线程直接执行结束
//方案一:while(true)+sleep
//因为我们将accept操作交给回调函数执行,如果主流程没有其它业务
//那么线程就会结束,这样回调函数就会出现问题。不结束那么回调函数就还有效
// while(true){
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
//方案二 :CountDownLatch作用:在完成一组正在执行的操作之前,允许当前
//的线程一直阻塞此处,让线程在此阻塞,防止服务端执行完成后退出。
CountDownLatch count=new CountDownLatch(1);
channel.accept(this,new AcceptHandler());
try {
count.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//CompletionHandler<V,A>
//V-IO操作的结果,这里标识返回的是成功建立的连接,AsynchronousSocketChannel
//A-IO操作附件,这里传入synchronousServerSocketChannel 便于继续接收请求。
class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,ServerHandler>{
@Override
public void completed(AsynchronousSocketChannel channel, ServerHandler serverHandler) {
ByteBuffer buffer=ByteBuffer.allocate(1024);
//异步读,第三个参数为接收消息回调的业务handle
channel.read(buffer,//读事件需要将数据写入这个buffer
buffer,//给回调函数传入的参数,
new ReadHandler(channel));
//继续接收其它客户端的请求。
//第一个参数依然需要传serverHandler,如果传null时,多个客户端连接会抛异常,但是不影响连接
serverHandler.channel.accept(serverHandler,this);
}
@Override
public void failed(Throwable exc, ServerHandler attachment) {
exc.printStackTrace();
}
}
class ReadHandler implements CompletionHandler<Integer //声明read操作返回的类型(读数据个数)
,ByteBuffer>{ //调用读操作传入的类型,针对read函数第一个buffer
//用户读取信息或者发送信息的channel
private AsynchronousSocketChannel channel;
public ReadHandler(AsynchronousSocketChannel channel){
this.channel=channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();//内核已经帮我们把数据写到buffer,
// 现在进行读写切换将数据从buffer中读出来
byte[] msg=new byte[attachment.remaining()];
attachment.get(msg);
try {
String expression=new String(msg,"UTF-8");
if("exit".equals(expression)){
//channel.close();
throw new IOException("关闭服务端");
}
System.out.println("服务器收到消息:"+expression);
//向客户端发送消息
doWrite(expression);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
private void doWrite(String msg){
byte[] bytes=msg.getBytes();
ByteBuffer buffer=ByteBuffer.allocate(bytes.length);
System.out.println("buffer.length:"+bytes.length);
buffer.put(bytes);
buffer.flip();
//异步写数据
channel.write(buffer, //从哪里拿数据
buffer, //回调函数传入的参数
//回调函数
new CompletionHandler<Integer, //写完成后返回的参数类型
ByteBuffer>() { //往回调函数传入的参数类型
// 因为第二个参数传入的是ByteBuffer类型
@Override
public void completed(Integer result, ByteBuffer attachment) {
//如果没有发送完则继续发送。
if(attachment.hasRemaining()){
channel.write(attachment,attachment,this);
}else{
//异步读,
ByteBuffer allocate=ByteBuffer.allocate(1024);
//attachment.clear();
attachment.clear();
//也可以直接传入attachment
channel.read(allocate,allocate,new ReadHandler(channel));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}