本篇文章与前几篇文章BIO编程、AIO编程、伪异步IO编程、NIO编程一起,作为对比的Netty实现,并未考虑TCP粘包/拆包的问题。
由阻塞I/O,伪异步I/O,非阻塞I/O,异步非阻塞I/O到Netty实现相同功能的代价,以及使用其他方式的弊端。

下面仍以时间服务器为例进行说明。

服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
 * @author j.tommy
 * @version 1.0
 * @date 2017/11/17
 */
public class TimeServer {
    public static void main(String[] args) {
        // 创建NIO线程组,用于服务端接收客户端请求
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 用户处理SocketChannel的网络读写
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 启动NIO服务端的辅助启动类
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class) // 设置线程的Channel
                .option(ChannelOption.SO_BACKLOG, 1024) // 设置NioServerSocketChannel的参数
                .childHandler(new ChildChannelHandler()); // 绑定I/O事件的处理类
        try {
            ChannelFuture f = b.bind(8808).sync(); // 绑定监听端口,并阻塞等待绑定操作完成
            f.channel().closeFuture().sync(); // 阻塞,等待服务端链路关闭
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully(); // 进行优雅退出,会释放跟shutdownGracefully相关联的资源
            workerGroup.shutdownGracefully();
        }
    }
}
class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new TimeServerHandler());
    }
}
class TimeServerHandler extends ChannelHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // ByteBuf类似于JDK中的ByteBuffer,但提供更强大跟灵活的功能
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()]; // 根据缓冲区可读字节数构建字节数组
        buf.readBytes(req); // 将缓冲区的直接数组复制到req
        String body = new String(req, "UTF-8");
        System.out.println("接收到客户端请求:" + body);
        // 如果接受到的消息时Server Time,则异步将服务端当前时间发送给客户端。
        if ("Server Time".equalsIgnoreCase(body)) {
            ByteBuf resp = Unpooled.copiedBuffer((new Date()).toString().getBytes());
            // 这里write方法只是将数据写入缓冲区,并没有真正发送
            ctx.write(resp);
        }
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 将缓冲区的数据写入SocketChannel
        ctx.flush();
    }
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
 * @author j.tommy
 * @version 1.0
 * @date 2017/11/17
 */
public class TimeClient {
    public static void main(String[] args) {
        // 构造NIO线程组
        NioEventLoopGroup group = new NioEventLoopGroup();
        // 辅助启动类
        Bootstrap b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class) // 设置线程的Channel
                .option(ChannelOption.TCP_NODELAY, true) // 设置NIOSocketChannel的参数
                .handler(new ChannelInitializer<SocketChannel>() { // 绑定I/O事件处理类
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new TimeClientHandler());
                    }
        });
        try {
            ChannelFuture f = b.connect("127.0.0.1", 8808).sync(); // 连接并等待连接成功
            f.channel().closeFuture().sync(); // 阻塞,等待客户端连接关闭
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully(); // 优雅退出
        }
    }
}
class TimeClientHandler extends ChannelHandlerAdapter {
    private ByteBuf msgSendBuf;
    public TimeClientHandler() {
        // 待发送数据
        String req = "Server Time";
        msgSendBuf = Unpooled.copiedBuffer(req.getBytes());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 链路建立成功后,将Server Time请求发送给服务端
        ctx.writeAndFlush(msgSendBuf);
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 接收服务端响应
        ByteBuf buf = (ByteBuf) msg;
        byte[] resp = new byte[buf.readableBytes()];
        buf.readBytes(resp);
        String response = new String(resp, "UTF-8");
        System.out.println("接收到服务端响应:" + response);
    }
}

相比使用NIO开发简单了很多。

参考《Netty权威指南》