使用Netty解决TCP粘包-拆包问题

TCP粘包和拆包

TCP底层并不知道上层业务数据的具体含义,它会根据缓冲区的实际情况进行包的拆分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能将多个小的数据包封装成一个大的数据包发送,这就是所谓的TCP粘包/拆包问题。

TCP粘包/拆包的原因

1.应用程序写入的字节大小大于套接字发送缓冲区大小;
2.进行MSS大小的TCP分段;
3.以太网帧的payload大于MTU进行IP分片。

解决策略

由于底层的TCP协议无法理解上层业务数据,所以在底层是无法保证数据包不被拆分和重组的。只能通过上层的业务协议栈设计来解决,根据业务的主流 协议的解决方案,归纳为如下:
(1)消息定长,例如每个报文的大小定位200字节,如果不够,空位补空格;
(2)包结尾增加回车换行符进行分割,例如FTP协议;
(3)将消息分为消息头和消息体,消息头中包含标识消息的总长度(或者消息体长度)的字段,通常涉及思路是消息头的第一个字段使用int32来表示消息的总长度;
(4)更复杂的应用层协议。

未考虑TCP粘包导致的功能异常案例

服务端代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class TimeServerHandler extends ChannelHandlerAdapter {
private int counter = 0;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf类似于JDK中的ByteBuffer,但提供更强大跟灵活的功能
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()]; // 根据缓冲区可读字节数构建字节数组
buf.readBytes(req); // 将缓冲区的直接数组复制到req
String body = new String(req, "UTF-8");
System.out.println("接收到客户端请求:" + body + ",counter:" + ++counter);
// 如果接受到的消息时Server Time,则异步将服务端当前时间发送给客户端。
if ("Server Time".equalsIgnoreCase(body)) {
ByteBuf resp = Unpooled.copiedBuffer((new Date()).toString().getBytes());
// 这里write方法只是将数据写入缓冲区,并没有真正发送
ctx.write(resp);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将缓冲区的数据写入SocketChannel
ctx.flush();
}
}

这里增加了counter来统计接收到报文的数量。

客户端代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class TimeClientHandler extends ChannelHandlerAdapter {
private ByteBuf msgSendBuf;
private int counter = 0;
public TimeClientHandler() {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 链路建立成功后,将Server Time请求发送给服务端
for (int i=0;i<100;i++) { // 这里循环发送100次请求
// 待发送数据
String req = "Server Time";
msgSendBuf = Unpooled.copiedBuffer(req.getBytes());
ctx.writeAndFlush(msgSendBuf);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收服务端响应
ByteBuf buf = (ByteBuf) msg;
byte[] resp = new byte[buf.readableBytes()];
buf.readBytes(resp);
String response = new String(resp, "UTF-8");
System.out.println("接收到服务端响应:" + response + ",counter:" + ++counter);
}
}

客户端在连接上服务端后,循环发送100条报文。在Handler类中增加了counter来统计接收到的服务器响应的次数。

运行结果如下:
服务端

客户端:

可以看到服务端只接收到2次,客户端并没有接收到服务端的响应。
服务端只接收到2次的原因是发生了TCP粘包,第1次接收的报文长度是1024。
客户端没有接收到服务器响应是因为由于发送了粘包,导致不符合服务端响应的条件。

使用LineBasedFrameDecoder和StringDecoder解决粘包

服务端:

1
2
3
4
5
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeServerHandler());
}

增加了LineBasedFrameDecoder和StringDecoder解码器。

TimeServerHandler的channelRead方法修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("接收到客户端请求:" + body + ",counter:" + ++counter);

// 如果接受到的消息时Server Time,则异步将服务端当前时间发送给客户端。
if ("Server Time".equalsIgnoreCase(body)) {
byte[] data = ((new Date()).toString() + System.getProperty("line.separator")).getBytes();
ByteBuf resp = Unpooled.copiedBuffer(data);
// 这里write方法只是将数据写入缓冲区,并没有真正发送
ctx.write(resp);
}
}

1.直接将msg转换为String;
2.响应的消息最后增加了换行符。

客户端:

1
2
3
4
5
6
7
8
9
10
b.group(group)
.channel(NioSocketChannel.class) // 设置线程的Channel
.option(ChannelOption.TCP_NODELAY, true) // 设置NIOSocketChannel的参数
.handler(new ChannelInitializer<SocketChannel>() { // 绑定I/O事件处理类
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});

增加了LineBasedFrameDecoder和StringDecoder解码器。

TimeClientHandler的channelActive和channelRead方法修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 链路建立成功后,将Server Time请求发送给服务端
String req = "Server Time" + System.getProperty("line.separator");
for (int i=0;i<100;i++) { // 这里循环发送100次请求
// 待发送数据
msgSendBuf = Unpooled.copiedBuffer(req.getBytes());
ctx.writeAndFlush(msgSendBuf);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收服务端响应
String response = (String) msg;
System.out.println("接收到服务端响应:" + response + ",counter:" + ++counter);
}

1.发送的报文最后增加换行符;
2.接收的消息msg直接转为String。

运行结果:
服务端

客户端

LineBasedFrameDecoder和StringDecoder的原理分析

LineBasedFrameDecoder的工作原理是依次遍历ByteBuf中的可读直接,看是否有\r\n或\n,如果有,就以此位置为结束为止,从可读索引到结束位置区间的字节就组成了一行。
它是以换行符为标志的解码器。支持携带结束符或不携带结束符两种解码方式,同时支持配置单行的最大长度。如果在连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略之前读到的异常码流。

StringDecoder的功能非常简单,就是将接受到的对象转换层字符串,然后继续调用后面的Handler。
LineBasedFrameDecoder和StringDecoder组合起来就是支持换行的文本解码器,被设计用来支持TCP粘包和拆包。

参考《Netty权威指南》

Donny wechat
欢迎关注我的个人公众号
打赏,是超越赞的一种表达。
Show comments from Gitment