自己有个毛病,写笔记的时候不知道从哪里开始说起…
以工作中用到的tcp为例子,总结一下netty的用法和坑。

Giao 哥镇楼

我是假的目录 0.0 :

  • 服务端启动
  • 怎么写逻辑处理类handler
  • 持有合法channel以及发送消息
  • 超时策略的处理方式及注意事项
  • 拆包/粘包处理思路
  • 客户端异常情况的重连方式
  • 如何使用netty开发websocket端
  • 如何用客户端代码搞jmeter测试高访问量下服务端抗压能力
  • netty 多服务端的开发思路
  • 剩下的一些坑(crc8 )

1.服务端启动

  • ServerBootstrap这个类代表一个socket服务,创建它,然后把它放在服务端的容器中。
1
2
3
4
5
6
7
8
9
10
11
12
13
ServerBootstrap b = new ServerBootstrap();
// 这里的EventLoopGroup 可以设置数量
b.group(new EventLoopGroup(),new EventLoopGroup())
.channel(NioServerSocketChannel.class)
// 放置自己的channelHandler,如果想使用spring 上下文也可以autowired
.childHandler(new MyChannelHandler());
// 这里是netty的配置属性
Map<ChannelOption<?>, Object> tcpChannelOptions = (...);
Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes")
ChannelOption option : keySet) {
b.option(option, tcpChannelOptions.get(option));
}
  • 服务端的配置属性我用的不多,大概是三个

    属性放置在ChannelOption作为key的Map中,在声名ServerBootstrap时放入ServerBootstrap.option(?) 方法中

我的channelOption:

name value 解释
ChannelOption.SO_KEEPALIVE true Socket参数,连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。
ChannelOption.SO_BACKLOG 1 Socket参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128。
ChannelOption.TCP_NODELAY true TCP参数,立即发送数据,默认值为Ture(Netty默认为True而操作系统默认为False)。该值设置Nagle算法的启用,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。Netty默认禁用该算法,从而最小化报文传输延时。

具体的配置可以看Api文档,或者看这篇文章:
netty Channel Option

2.怎么写逻辑处理类handler

先介绍一下我们项目中的通信规则:
数据消息是一串byte数组,校验方式用的CRC8,分割符是十六进制的FFFF,占据两个byte
格式是这样的:

头部 内容 crc8 结尾
长度(byte): 1 x 1 2

简单的ChannelHandler 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@ChannelHandler.Sharable // 这个注解的意思是handler可以被多个channel安全的共享
public class MyChannelHandler extends ChannelInboundHandlerAdapter{
// 主要的消息接收的方法,所有的消息都走的这个方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {}

// 字面意思,代表消息接收完毕时调用的方法
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}

// 发生异常时调用该方法
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {}

// socket最初连接的方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {}

// socket断开时调用该方法
@Override
public void channelInactive(ChannelHandlerContext ctx) {}

// socket 通信超时时调用该方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {}
}

GIAO!一个简单的channelHandler 大概就是上面这个样子,这里有几个需要注意的地方:

  • ChannelHandler可以继承ChannelInboundHandlerAdapter,或者是它的子类SimpleChannelInboundHandler<T>

    • SimpleChannelInboundHandler对父类进行了简单的封装,当使用SimpleChannelInboundHandler时需要规范消息类型<T>,一般是ByteBuf
    • 当使用ChannelInboundHandlerAdapter时要注意在接受完消息后,应该把msg手动释放掉,而SimpleChannelInboundHandler已经帮助你做了这个事情。
      • 使用方法:ReferenceCountUtil.release(msg);进行释放,其实只要是使用了ByteBuf,都要进行这个操作,这里的原因主要涉及netty对于堆外内存的使用了,有兴趣的可以自行百度。
  • 我们可以根据头部区分消息类型,根据类型不同进行不同的逻辑处理方法。

  • 假如你有类似数组、Hash表存取的操作,最好把逻辑处理这部分操作放到内存队列中去处理。

    • netty虽然是全异步的,但是你的服务端却并不是,假如你有类似数组、Hash表存取的操作,还是应该放到队列中去,或者你应该把这里所有操作Jvm内存数据结构的操作都改成线程安全的。
      • ps:这里用的内存队列是Disruptor,还挺好用的,以后会写一篇关于disruptor的总结。
    • 所有处理逻辑放入队列有好处也有坏处,当你真的这么干的时候,有可能线上服务器环境会显示你的netty很稳定,cpu开销等等都比较理想,然而事实上很可能并非如此。
  • 一个channel从注册到断开的顺序到底是怎样的

      1. channelRegistered
      1. channelActive
      1. channelInactive
      1. channelUnregistered
  • 超时处理userEventTriggered 到底应该咋处理:

    超时策略可以处理一些客户端网络突然断开时的问题
    netty可以设置客户端通信时间——读超时,写超时以及读写超时
    当发生该类超时的时候,我们就可以在服务端进行主动把channel踢掉线的这种操作。
    下文中详细写~

3.持有合法channel以及发送消息

  • 当客户端在建立连接之后,经过了一系列认证判断啥的过程,一个合法的长连接应该被合理的引用起来,方便进行下一步的业务逻辑操作。
    • ChannelHandlerContext可以被理解为一个socket的上下文,拿到了它,就掌握了这个连接的通信。
    • so,可以放到hash中,数组中,或者跟其他客户端属性一起封装到一个类中,都可以。。。。

单发消息

ChannelHandlerContext.writeAndFlush()

单发消息回调

可能有些聪明的小朋友发现ChannelHandlerContext.writeAndFlush()返回的结果是一个channelFuture 回调类,
就天真的以为可以使用channelFuture.isSuccess() 方法验证这条消息是否真的给对方发送过去了。

NO,isSuccess只回调成功放置消息至tcp缓冲区!!!
也就是说即便是客户端直接断网,服务端这里发送消息后拿到的 success 依旧是 true

那特么怎么判断发送消息是否成功?

  • 一种思路是通过判断 Channel 上绑定的时间与当前时间只差是否超过了阈值,具体的实现我会以后整理一下。

群发消息广播

有的时候业务并不单单仅仅要求点对点发送,而是很多人同时收到某个消息,比如游戏中位置的移动,这是要同时告知所有场景内用户的。

  • 使用 ChannelGroup
1
2
3
4
ChannelGroup channels =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// 这相当于把这个连接放置到这个场景中
channels.add(你自己的channel)
channels.writeAndFlush();

就酱

4.超时策略的处理方式及注意事项

  • 使用IdleStateHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TcpProtocolInitalizer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 这里的“30,30,60”分别代表读超时,写超时以及读写超时时间,单位是秒
pipeline.addLast("ping", new IdleStateHandler(30, 30, 60, TimeUnit.SECONDS));
// 这段代码主要是声名结尾关键字,这样netty方便对多条指令进行拆包
byte[] bytes = new byte[2];
bytes[0] = (byte)0xFF;
bytes[1] = (byte)0xFF;
ByteBuf delimeter = Unpooled.copiedBuffer(bytes);
try {
// 这里使用了DelimiterBasedFrameDecoder这个拆包器,也有String类型的等等
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimeter));
int port = ch.localAddress().getPort();
System.out.println("ch.localAddress().getPort()==" + port);
if (port == tcpConfig.getTcpPort()) {
// 我们在这里关联自己的逻辑channelHandler业务
pipeline.addLast("handler", serverHandler);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

这个类要在ServerBootstrap 启动之前注入initChannel()方法里,这样就会在handler中在userEventTriggered中收到超时的回调了。

5.拆包/粘包 处理思路

其实主要思路还是如何让netty知道一段指令到哪里才是结束~~~

  • 定长消息
  • 关键字作为结束符,要注意关键字需要避免重复 ——保证唯一性
  • 自定义消息格式,需要自己实现解码器

** 拆包器在初始化serverbootstrap 时生效 **

6.客户端异常情况的重连方式

几种思路:

  • 客户端主动发起连接请求,复用eventloopgroup, Bootstrap 创建新对象并覆盖到老的引用上,
    • 然后 Bootstrap 重新进行连接 ,该动作可以不限制重连次数

7.如何使用netty开发websocket端

  • 不想写了,偷懒了

8.如何用客户端代码搞jmeter测试高访问量下服务端抗压能力

利用jmeter进行代码测试在开发中还是很重要的,我临时写了一套代码放在git上了:
https://github.com/Drazen08/clientDemo.git

具体步骤

  • 引入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- https://mvnrepository.com/artifact/org.apache.jmeter/ApacheJMeter_core -->
<dependency>
<groupId>org.apache.jmeter</groupId>
<artifactId>ApacheJMeter_core</artifactId>
<version>5.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.jmeter/ApacheJMeter_java -->
<dependency>
<groupId>org.apache.jmeter</groupId>
<artifactId>ApacheJMeter_java</artifactId>
<version>5.0</version>
</dependency>
  • 声明主程序,继承AbstractJavaSamplerClient的一个类

  • 把代码打成jar包,注意需要把所有依赖都打入jar包

  • 在jmeter 内创建线程组…

9.netty 多服务端的开发思路

10.剩下的一些坑(crc8)

本文采用CC-BY-SA-3.0协议,转载请注明出处
Author: dadonggua