NIO2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供2种方式获取操作结果。

1.通过java.util.concurrent.Future来获取异步操作的结果;
2.在异步操作的时候传入一个java.nio.channels。
CompletionHandler作为异步完成的回调。

NIO2.0的异步套接字通道是真正的异步非阻塞I/O,它对应UNIX网络编程中的事件驱动I/O(AIO),它不需要通过多路复用器轮询注册的通道即可实现异步读写,从而简化了NIO的编程模型。

AIO版本的时间服务器代码
服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public class TimeServer {
    public static void main(String[] args) {
        TimeServer ts = new TimeServer();
        AsyncTimeServerHandler atsh = ts.new AsyncTimeServerHandler(9999);
        // 可以不使用线程
        new Thread(atsh,"AsyncTimeServerHandler").start();
    }
    class AsyncTimeServerHandler implements Runnable {
        private AsynchronousServerSocketChannel asynchronousServerSocketChannel;
        private CountDownLatch latch = null;
        public AsyncTimeServerHandler(int port) {
            try {
                // 创建一个异步的服务端通道
                asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
                // 绑定监听端口
                asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
                System.out.println("TimeServer is start at:" + port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void run() {
            // 使用CountDownLatch来让服务端在操作完成才退出。
            latch = new CountDownLatch(1);
            doAccept();
            try {
                latch.await(); // 阻塞
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        private void doAccept() {
            // 接收客户端的连接,使用CompletionHandler来接收accept操作成功的通知消息
            asynchronousServerSocketChannel.accept(this,new AccpetCompleteHandler());
        }
    }
    class AccpetCompleteHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {
        @Override
        public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
            // 继续接收其他的客户端连接
            attachment.asynchronousServerSocketChannel.accept(attachment,this);
            // 客户端连接成功后,使用ReadCompletionHandler读取客户端发送的信息,将信息读取到byteBuffer。
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            result.read(byteBuffer,byteBuffer,new ReadCompleteHandler(result));
        }
        @Override
        public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
            exc.printStackTrace();
            attachment.latch.countDown();
        }
    }
    class ReadCompleteHandler implements CompletionHandler<Integer,ByteBuffer> {
        private AsynchronousSocketChannel channel;
        public ReadCompleteHandler(AsynchronousSocketChannel channel) {
            if (this.channel == null) {
                this.channel = channel;
            }
        }
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            // 为后续的从缓冲区读取数据做准备
            attachment.flip();
            // 根据缓冲区的可读字节数创建数组
            byte[] bytes = new byte[attachment.remaining()];
            // 将缓冲区的可读数据读取到bytes数组
            attachment.get(bytes);
            try {
                String body = new String(bytes,"UTF-8");
                System.out.println("接收到客户端请求:" + body);
                // 如果客户端发送的是Server Time,则将服务器当前时间发送给客户端。
                if ("Server Time".equalsIgnoreCase(body)) {
                    doWrite((new Date()).toString());
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        private void doWrite(String response) {
            byte[] bytes = response.getBytes();
            // 根据响应内容的大小构建缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
            // 将相应内容复制到缓冲区
            buffer.put(bytes);
            // 为后续从缓冲区读取数据准备
            buffer.flip();
            // 异步write方法将缓冲区数据写出
            channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    // 如果缓冲区还有数据,继续发送
                    if (attachment.hasRemaining()) {
                        channel.write(attachment,attachment,this);
                    }
                }
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class TimeClient {
    public static void main(String[] args) {
        TimeClient tc = new TimeClient();
        AsyncTimeClientHandler tch = tc.new AsyncTimeClientHandler("127.0.0.1",9999);
        new Thread(tch,"TimeClientHandler").start();
    }
    class AsyncTimeClientHandler implements CompletionHandler<Void,AsyncTimeClientHandler>,Runnable {
        private String host;
        private int port;
        private CountDownLatch latch ;
        private AsynchronousSocketChannel client;
        public AsyncTimeClientHandler(String host,int port) {
            this.host = host;
            this.port = port;
            try {
                client = AsynchronousSocketChannel.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void run() {
            // 使用CountDownLatch,防止异步操作还未完成就退出了。
            latch = new CountDownLatch(1);
            // 异步连接服务端
            client.connect(new InetSocketAddress(host,port),this,this);
            try {
                latch.await(); // 阻塞
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                client.close(); // 操作完成后,关闭通道
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void completed(Void result, AsyncTimeClientHandler attachment) {
            // 要向服务器发送的数据
            byte[] data = "Server Time".getBytes();
            final ByteBuffer buffer = ByteBuffer.allocate(data.length);
            buffer.put(data);
            buffer.flip();
            // 异步写消息给服务端
            client.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    // 如果有消息还没有发送完毕,则继续发送
                    if (buffer.hasRemaining()) {
                        client.write(attachment,attachment,this);
                    }
                    else {
                        // 消息发送完成后,异步读取服务器响应
                        // 预分配空间为1K
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                            @Override
                            public void completed(Integer result, ByteBuffer attachment) {
                                // 为后续从缓冲区读取数据做准备
                                attachment.flip();
                                // 根据缓冲区可读字节的长度构建字节数组
                                byte[] data = new byte[attachment.remaining()];
                                // 将缓冲区数据复制到data数组
                                attachment.get(data);
                                try {
                                    String body = new String(data,"UTF-8");
                                    System.out.println("接收到服务端消息:" + body);
                                    // 这里模拟只请求服务器1次,完成后退出。
                                    latch.countDown();
                                } catch (UnsupportedEncodingException e) {
                                    e.printStackTrace();
                                }
                            }
                            @Override
                            public void failed(Throwable exc, ByteBuffer attachment) {
                                try {
                                    client.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                                latch.countDown();
                            }
                        });
                    }
                }
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    latch.countDown();
                }
            });
        }
        @Override
        public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            latch.countDown();
        }
    }
}

JDK底层通过线程池ThreadPoolExecutor来执行回调通知。AsynchronousServerSocketChannel和AsynchronousSocketChannel,他们都由JDK底层的线程池回调并驱动读写操作。

参考《Netty权威指南》