NIO编程

NIO是JDK1.4进入的非阻塞IO。NIO弥补了原来BIO的不足。

与NIO相关的几个概念

1.缓冲区Buffer
Buffer是一个对象,包含一些要 写入或读出的数据。在NIO库中,所有数据都是经过缓冲区处理的。在读取数据时,它是直接读取到缓冲区的;在写入数据时,写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。

缓冲区本质上是一个数组,通常是一个字节数组(ByteBuffer),也可以使用其他种类的数组。但是缓冲区不仅是一个数组,它提供了对数据的结构化访问以及维护读写位置等信息。

最常用的是ByteBuffer,它提供了一组功能用于操作byte数组。

2.通道Channel
Channel是一个通道,可以通过它来读取和写入数据。通道是双向的,流只在一个方向移动,而且通道可以同时用于读取和写入。
因为Channel是全双工的,所以可以更好的映射底层操作系统的api。

3.多路复用器Selector
Selector会不断轮询注册在它上面的Channel,如果某个Channel上有新的TCP连接、读和写事件,这个Channel就处于就绪状态。会被Selector轮询出来,然后通过SelectionKey可以获取就绪的Channel的集合,进行后续的I/O操作。
一个多路复用器可以同时轮询多个Channel,由于JDK使用epool替代了传统的select实现,所以它并没有最大连接句柄1024/2048的限制。这也就意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。

服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/**
* @author j.tommy
* @date 2017/11/11
*/
public class NioTimeServer {
public static void main(String[] args) {
NioTimeServer nts = new NioTimeServer();
// 创建Reactor线程,并启动
TimeServerHandler tsh = nts.new TimeServerHandler(8080);
new Thread(tsh,"Nio-Time-Server").start();
}
class TimeServerHandler implements Runnable {
private volatile boolean stop = false;
Selector selector = null;
ServerSocketChannel serverSocketChannel = null;
/**
* 初始化多路复用器,绑定监听端口
* @param port
*/
public TimeServerHandler(int port) {
try {
// 创建多路复用器
selector = Selector.open();
// 打开ServerSocketChannel,监听客户端连接
serverSocketChannel = ServerSocketChannel.open();
// 设置连接为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 绑定监听端口
serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
// 将ServerSocketChannel注册到Reactor线程的多路复用器上,并监听OP_ACCET事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start at port:" + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
// 循环遍历selector,休眠时间为1S,无论是否有读写事件发生,selector每隔1S被唤醒1次。
while (!stop) {
try {
// 选择一组准备就绪的Key
selector.select(1000);
// 遍历准备就绪的Key
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> itKeys = keys.iterator();
while (itKeys.hasNext()) {
SelectionKey k = itKeys.next();
itKeys.remove();
try {
handleKey(k);
} catch (Exception e) {
if (k != null) {
k.cancel();
if (k.channel() != null) {
k.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 关闭多路复用器,所有注册在上面的Channel和Pipe等资源会自动去注册并关闭,不需要重复释放资源
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleKey(SelectionKey key) throws IOException {
if (key.isValid()) {
if (key.isAcceptable()) {
// 多路复用器监听到新的客户端接入,处理新的接入请求
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
// 设置客户端链路为非阻塞模式
sc.configureBlocking(false);
// 将新接入的客户端注册到Reactor线程的多路复用器上,并监听读操作,用来读取客户端发送的消息
sc.register(selector,SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// 异步读取客户端消息到缓冲区
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
// 重置缓冲区的position,将limit设置为postion,position设置为0.用于后续的读取操作
readBuffer.flip();
// 创建一个容量为缓冲区可读字节个数的字节数组。=limit - postion。本例中position=0,所以实际上=limit
byte[] bytes = new byte[readBuffer.remaining()];
// 将缓冲区的可读的数据复制到字节数组中
readBuffer.get(bytes);
// 根据字节数组,构造一个字符串
String body = new String(bytes,"UTF-8");
System.out.println("服务器接收到客户端请求:" + body);
// 如果客户端发送的是Server Time的请求,则应答
if (body.equalsIgnoreCase("Server Time")) {
doWrite(socketChannel,(new Date()).toString());
}
}
else if (readBytes < 0) { // 客户端链路关闭
key.cancel();
socketChannel.close();
}
else { // 没有读取到数据,不处理
}
}
}
}
/**
* 向客户端写入当前时间
* @param sc
* @param response
* @throws IOException
*/
private void doWrite(SocketChannel sc , String response) throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
// 构造一个缓冲区,缓冲区的大小=要发送给客户端的数据的字节长度
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
// 将数据复制到缓冲区
writeBuffer.put(bytes);
// 重置缓冲区postion
writeBuffer.flip();
// 将消息异步发送到客户端
sc.write(writeBuffer);
}
}
}
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
* @author j.tommy
* @date 2017/11/11
*/
public class NioTimeClient {
public static void main(String[] args) {
NioTimeClient ntc = new NioTimeClient();
TimeClientHandler tch = ntc.new TimeClientHandler("127.0.0.1",8080);
// 创建Reactor线程并启动
new Thread(tch,"Nio Time Client").start();
}
class TimeClientHandler implements Runnable {
private String host;
private int port;
private volatile boolean stop = false;
private Selector selector = null;
private SocketChannel socketChannel = null;
public TimeClientHandler(String host,int port) {
this.host = host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
System.exit(1);
}
}
@Override
public void run() {
try {
doConnect();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
System.out.println("开始轮休多路复用器已准备就绪的key");
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> itKeys = keys.iterator();
while (itKeys.hasNext()) {
SelectionKey sk = itKeys.next();
itKeys.remove();
try {
handleInput(sk);
} catch (IOException e) {
if (sk != null) {
sk.cancel();
}
if (sk.channel() != null) {
sk.channel().close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey sk) throws IOException {
if (sk.isValid()) {
SocketChannel sc = (SocketChannel) sk.channel();
if (sk.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector,SelectionKey.OP_READ);
doWrite(sc);
}
else {
System.exit(1);
}
}
if (sk.isReadable()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(byteBuffer);
if (readBytes > 0) {
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String response = new String(bytes,"UTF-8");
System.out.println("Server response:" + response);
this.stop = true;
}
else if (readBytes < 0) {
sk.cancel();
sc.close();
}
else {
}
}
}
}
private void doConnect() throws IOException {
boolean connected = socketChannel.connect(new InetSocketAddress(host,port));
if (connected) {
socketChannel.register(selector,SelectionKey.OP_READ);
doWrite(socketChannel);
}
else {
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel sc) throws IOException {
byte[] data = "Server Time".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(data.length);
writeBuffer.put(data);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining()) {
System.out.println("Server Time请求发送成功!");
}
}
}
}

这里的代码并没有考虑“半包读”和“半包写”,如果加上这些,相比BIO难度要大很多。

实用NIO编程的优点:
1.客户端发起的连接操作时异步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要向BIO的客户端那样被同步阻塞。
2.SocketChannel的读写操作都是异步的,如果没有可写数据,它不会同步等待,直接返回。这样I/O线程可以处理其他的链路。
3.线程模型的优化:由于JDK的selector在Linux等主流的操作系统中通过epoll实现,它没有连接句柄数的限制,这样一个Selector线程就可以同时处理成千上万的客户端连接,而且性能不会随着客户端的增加而线性下降。因此,它非常适合做高性能、高负载的网络服务器。

参考《Netty权威指南》

Donny wechat
欢迎关注我的个人公众号
打赏,是超越赞的一种表达。
Show comments from Gitment