这里的实现方式是:将消息分为两部分,也就是消息头和消息尾,消息头中写入要发送数据的总长度,通常是在消息头的第一个字段使用int值来标识发送数据的长度。

首先我们写一个Encoder,我们继承自MessageToByteEncoder ,把对象转换成byte,继承这个对象,会要求我们实现一个encode方法:

1
2
3
4
5
6
7
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
byte[] body = convertToBytes(msg); //将对象转换为byte,伪代码,具体用什么进行序列化,你们自行选择。可以使用我上面说的一些
int dataLength = body.length; //读取消息的长度
out.writeInt(dataLength); //先将消息长度写入,也就是消息头
out.writeBytes(body); //消息体中包含我们要发送的数据
}

那么当我们在Decode的时候,该怎么处理发送过来的数据呢?这里我们继承ByteToMessageDecoder方法,继承这个对象,会要求我们实现一个decode方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < HEAD_LENGTH) { //这个HEAD_LENGTH是我们用于表示头长度的字节数。 由于上面我们传的是一个int类型的值,所以这里HEAD_LENGTH的值为4.
return;
}
in.markReaderIndex(); //我们标记一下当前的readIndex的位置
int dataLength = in.readInt(); // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4
if (dataLength < 0) { // 我们读到的消息体长度为0,这是不应该出现的情况,这里出现这情况,关闭连接。
ctx.close();
}

if (in.readableBytes() < dataLength) { //读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
in.resetReaderIndex();
return;
}

byte[] body = new byte[dataLength]; // 嗯,这时候,我们读到的长度,满足我们的要求了,把传送过来的数据,取出来吧~~
in.readBytes(body); //
Object o = convertToObject(body); //将byte数据转化为我们需要的对象。伪代码,用什么序列化,自行选择
out.add(o);
}

下面来一个示例(实例只做了字符串的处理,其他自定义对象的处理参考上面)。
服务端(接收端):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Server {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 添加自定义的解码器
socketChannel.pipeline().addLast(new MyCustomMessageDecoder());
socketChannel.pipeline().addLast(new ServerMessageHandler());
}
});
try {
ChannelFuture future = bootstrap.bind(9999).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}

}
}

自定义的消息解码器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class MyCustomMessageDecoder extends ByteToMessageDecoder {
// 消息头:发送端写的是一个int,占用4字节。
private final static int HEAD_LENGTH = 4;

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//
if (in.readableBytes() < HEAD_LENGTH) {
return;
}

// 标记一下当前的readIndex的位置
in.markReaderIndex();

// 读取数据长度
int dataLength = in.readInt();
// 我们读到的消息体长度为0,这是不应该出现的情况,这里出现这情况,关闭连接。
if (dataLength < 0) {
ctx.close();
}

//读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}

// 将缓冲区的数据读到字节数组
byte[] body = new byte[dataLength];
in.readBytes(body);
//将byte数据转化为我们需要的对象。伪代码,用什么序列化,自行选择
Object msg = convertToObj(body);
out.add(msg);
}

private Object convertToObj(byte[] body) {
return new String(body,0,body.length);
}
}

Server端的消息处理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
private int messageCount = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String _msg = (String) msg;
System.out.println("["+(++messageCount)+"]接收到消息:" + _msg);

// 注意:业务异常需要处理,不能不管,否则会调用exceptionCaught()

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Client
{
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_SNDBUF,10)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 增加自定义编码器
socketChannel.pipeline().addLast(new MyCustomMessageEncoder());
socketChannel.pipeline().addLast(new ClientMessageHandler());
}
});

try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}

}
}

自定义的消息编码器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MyCustomMessageEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
// 要发送的数据
// 这里如果是自定义的类型,msg即为自定义的类型,需要转为byte[]
byte[] body = ((ByteBuf)msg).array();

// 数据长度
int dataLength = body.length;
// 缓冲区先写入数据长度
out.writeInt(dataLength);
// 再写入数据
out.writeBytes(body);
}
}

客户端的消息处理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ClientMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String msg = "hello,world";
byte[] data;
ByteBuf buf;
for (int i=0;i<100;i++) {
data = (msg+i).getBytes();
buf = Unpooled.copiedBuffer(data);
ctx.writeAndFlush(buf);
}
System.out.println("100条 消息发送完毕");
ctx.close();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

运行效果
客户端:

服务端:

参考:https://blog.csdn.net/AlbertFly/article/details/51533992