You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
8.9 KiB
8.9 KiB
Netty
成就点
- 负责公司千万级别实时在线用户长连消息推送系统
- 企业社交 Sass 完美解决方案
主要方案
- 完美应用示例(十万级): 线程池异步业务 + (Linux 句柄) + 单机Netty + 日志
- 异步化完美方案(百万级): 线程池异步业务 + (Linux 句柄) + HA高可用 + 日志
- 多服务端解决方案(千万级): Disruptor异步业务 + (Linux 句柄) + HA高可用 + 日志
参考资料
- https://www.imooc.com/t/6224286 慕课网牛人教程
Netty基本组件
- Netty对传统 Socket 进行封装
- NioEventLoop,Channel,ByteBuf,Pipeline,ChannelHandler
- NioEventLoop => Thread(监听客户端的连接 + 处理每个连接的读写线程)
- Channel => Socket 对一条连接的封装
- ByteBuf => IO Bytes 每一个Channel上数据流的处理
- Pipeline => Logic Chain 逻辑处理链
- Channel Handler => Logic 每一个逻辑
Netty服务端启动
- 分析服务端启动流程,包括服务端Channel的创建,初始化,以及注册到selector
- 服务端的启动在哪里初始化?
- 在哪里accept连接?
- 启动过程
- 创建服务端 Channel -> newChannel()
- 初始化服务端 Channel -> init()
- 注册 selector -> register()
- 端口绑定 -> doBind()
- 源码分析1 - 创建服务端Channel
- bind()[用户代码入口] -> initAndRegister()[初始化并注册] -> newChannel()[创建服务端channel]
- 反射创建服务端Channel : NioServerSocketChannel()[构造函数] -> newSocket()[通过jdk来创建底层jdk channel] -> NioServerSocketChannelConfig()[tcp参数配置类] -> AbstractNioChannel() ->
- configureBlocking(false)[阻塞模式] -> AbstractChannel()[创建id, unsafe, pipeline]
- 源码分析2 - 初始化服务端Channel
- init()[初始化入口] -> set ChannelOptions, ChannelAttrs [这个配置用的不多] -> set ChildOptions,ChildAttrs [每次创建的新连接都会把这两个属性配置上去] -> config handler[配置服务端pipeline] ->
- add ServerBootstrapAcceptor[添加连接器,默认添加特殊的Handler]
- init()[初始化入口] -> set ChannelOptions, ChannelAttrs [这个配置用的不多] -> set ChildOptions,ChildAttrs [每次创建的新连接都会把这两个属性配置上去] -> config handler[配置服务端pipeline] ->
- 源码分析3 - 注册 selector
- AbstractChannel.register(channel)[入口] -> this.eventLoop = eventLoop[绑定线程] -> register0()[实际注册] -> doRegister()[调用jdk底层注册] -> invokeHandlerAddedIfNeeded()[做一些事件的回调] ->
- fireChannelRegistered()[传播事件]
- AbstractChannel.register(channel)[入口] -> this.eventLoop = eventLoop[绑定线程] -> register0()[实际注册] -> doRegister()[调用jdk底层注册] -> invokeHandlerAddedIfNeeded()[做一些事件的回调] ->
- 源码分析4 - 端口绑定
- AbstractUnsafe.bind()[入口] -> doBind() -> javaChannel().bind()[jdk底层绑定] -> pipeline.fireChannelActive()[传播事件] -> HeadContext.readIfIsAutoRead()
NioEventLoop
- 分析Netty reactor 线程处理过程,包括事件监听,事件处理,常规任务处理和定时任务处理
- 问题:
- 默认情况下, Netty 服务端启多少线程? 何时启动?
- Netty是如何解决jdk空轮询Bug的?会导致CPU飙升到100%
- Netty是如何保证异步串行无锁化?
- NioEventLoop 创建
- new NioEventLoopGroup()[线程组, 默认2*CPU] -> new ThreadPerTaskExecutor()[线程创建器] -> for(){new Child()}[循环构造NioEventLoop对象数组] ->
- chooserFactory.newChooser()[线程选择器:给每个新连接分配NioEventLoop线程]
- new NioEventLoopGroup()[线程组, 默认2*CPU] -> new ThreadPerTaskExecutor()[线程创建器] -> for(){new Child()}[循环构造NioEventLoop对象数组] ->
- 深入剖析 ThreadPerTaskExecutor()
- 每次执行任务都会创建一个线程实体 -> FastThreadLocalThread
- NioEventLoopGroup 线程命名规则 nioEventLoop-1-xx
- 深入剖析 newchild()
- 保存线程执行器 ThreadPerTaskExecutor
- 创建一个 MpscQueue
- 创建一个 selector
- 深入剖析 chooserFactory.newChooser() 创建线程选择器
- chooserFactory.newChooser() -> isPowerOfTwo()[判断是否是2的幂, 如2,4,8,16] ->[TRUE] PowerOfTwoEventExecutorChooser[优化] -> index++ & (length-1) ->
- [FALSE] GenericEventExecutorChooser[普通] -> abs(index++ % length)
- ps: 计算机底层 & 操作比 取模操作高效的多, &操作是二进制的操作
- chooserFactory.newChooser() -> isPowerOfTwo()[判断是否是2的幂, 如2,4,8,16] ->[TRUE] PowerOfTwoEventExecutorChooser[优化] -> index++ & (length-1) ->
- NioEventLoop 启动触发器
- 服务端启动绑定端口
- 新连接接入通过 chooser 绑定一个 NioEventLoop
- 启动流程
- bind() -> execute(task)[入口] -> startThread() -> doStartThread()[创建线程] -> ThreadPerTaskExecutor.execute()[线程执行器进行创建] ->
- thread = Thread.currentThread()[保存创建的线程] -> NioEventLoop.run()[启动]
- bind() -> execute(task)[入口] -> startThread() -> doStartThread()[创建线程] -> ThreadPerTaskExecutor.execute()[线程执行器进行创建] ->
- NioEventLoop 执行逻辑
- NioEventLoop.run() -> for(;;) -> select()[检查是否有io事件] -> processSelectedKeys()[处理io事件] -> runAllTasks()[处理异步任务队列]
- select() 方法执行逻辑 - 检测IO事件
- deadline 以及任务穿插逻辑处理
- 阻塞式的 select
- 解决 jdk 空轮训的 bug
新连接接入
- 分析新连接接入以及绑定reactor线程,绑定到selector的过程
pipeline
- 分析pipeline的创建,初始化,添加和删除ChannelHandler,事件传播机制,异常传播机制
ByteBuf
- 详细分析ByteBuf种类,如何减少多线程内存分配竞争,不同大小内存是如何分配的
Netty解码
- 详细分析Netty解码原理,解码器抽象,以及几种常见的解码器
Netty编码及writeAndFlush()
- writeAndFlush传播流程,编码器抽象,writeAndFlush详细流程
Netty性能优化工具类解析
- 详细分析Netty里面最高频使用的两个性能优化类FastThreadLocal以及轻量级对象池Recycler
Netty设计模式应用
- 分析各类常见设计模式以及在Netty中的应用
Netty高性能并发调优
- 系统层面单机如何支持百万连接,如何提升应用层面性能
- 单机通过修改Linux句柄数目限制提升
- 应用层主要是通过线程池和异步方式进行调优
Netty调优
实战及应用
- 开源软件中的应用
- 实现 RPC
数据可靠性通信场景分析与架构设计 - 实际场景
- 数据通信要求实时性高, 且高性能, 异构系统
- 需要保障不同的业务请求对应不同的实现
- 支持负载均衡策略(TCP级别的), 故障切换
- 需要可靠性保障的支持, 数据不允许丢失
- 定时任务是对中间状态进行定时的扫描, 进行消息推送补偿, 改写为最终状态的
1. 思考的问题
2 自定义数据格式 - 定义使用的数据结构
- Message 的格式是通用的
- 只需要把业务相关的数据格式进行定义即可,就是对应的什么模型类
3 整合 SpringBoot - 思路
4. 高可用 Netty 架构分析
- 常见的负载均衡策略 : LBS / Haproxy + Keepalived / Nginx
- 选择最小连接数的策略
Netty调优
Netty调优 - 内存调优
1. netty客户端连接池泄漏问题复现及原因解析
-
问题复现: 生产环境使用 netty 作为客户端通信框架, 在客户端创建一个 tcp 连接池, 随着也无压力的上升, 在高峰期出现OOM问题, 需要重启才能恢复
-
参考: com.baiye.case2
-
Bootstrap 不是线程安全的
- 真正的逻辑是 bossGroup 来进行处理的
-
总结:编码规范问题, 注意客户端连接建立的编写, 关闭chanel的问题
2. netty内存池泄漏问题复现及排查解析
-
问题复现: 服务端使用 netty 作为通信框架, 负责消息接入和路由转发, 在功能测试时没有发现问题, 转性能测试后, 运行一段时间发现内存分配异常, 服务端无法接收请求消息, 系统吞吐量为0
-
当消息进来的时候, Netty给他分配的内存没有释放,最终导致了内存泄漏
-
ByteBuf申请和释放场景分析
- 基于内存池的请求 ByteBuf
- 我们建议在使用 ServerHandler 时候实际使用 SimpleChannelInboundHandler , 不会存在内存泄露问题
- 基于非内存池请求的 ByteBuf (不推荐), 还是要自己释放
- 基于内存池的响应 ByteBuf
- 基于非内存池的响应 ByteBuf (不推荐) 还是要自己释放
- 基于内存池的请求 ByteBuf
-
总结: ServerHandler 选用 SimpleChannelInboundHandler 来规避内存泄漏问题,但是仅限于你的Handler里面是同步处理的逻辑, 如果是异步的处理, 只能用 ChannelInboundHandlerAdapter
3. netty 内存池 的性能对比
-
4.x 引入 内存池机制, 对netty的提升很大
-
com.baiye.case3.ByteBufPerformance 池化代码测试
-
netty 内存池很大程度上提升了系统性能, 但是无用则会代理内存泄漏问题, 由于内存的申请和释放可能由Netty框架隐形完成, 所以增加了内存管理复杂度,所以必须深入理解ByteBuf的申请和释放机制,以免误用
-
总结: 使用 4.x 提供的池化技术 PooledByteBufAllocator , 提升性能
4. ByteBuf 故障排查及优化 - HTTP 响应 Body 获取异常
- HTTP协议栈 ByteBuf 使用问题
- netty 解决好 HTTP 协议的问题, 就可以当 Tomcat 进行使用