[文档修改](master): Netty 学习笔记

更新了Netty的实战及一些简单的参数调优 2022-08-24 01:26:50
master
土豆兄弟 2 years ago
parent 26ccc04ee0
commit a27f0b24e3

@ -283,6 +283,136 @@ protoc --java_out=[生成文件的目录]
- 开源实现: Apache Common Pool
- Netty 轻量级对象池实现 io.netty.util.Recycler [他自己的场景来建立的]
## 4. 基本原理编码
### 4.1 编写网络应用程序的基本步骤
- ![编写网络应用程序的基本步骤](pic/编写网络应用程序的基本步骤.png)
- ![编写网络应用程序的基本步骤1](pic/编写网络应用程序的基本步骤1.png)
### 4.2 数据结构设计
- ![Netty案例数据结构设计](pic/Netty案例数据结构设计.png)
- opration/ opration result - 封装成 Body [这里采用的是Json编码]
- version - 头信息, 版本号 - 处理兼容性
- opCode - 头信息, opration 类型 - Json解析的时候用到
- streamId - 头信息, 标识信息唯一的Id
- length - 处理粘包和半包问题
### 4.3 代码
- case 查看 com.baiye.case5
### 4.4 Netty 编程中的易错点
- LengthFieldBasedFrameDecoder 中 initialBytesToStrip 未考虑设置
- 见 com.baiye.case5.server.codec.OrderFrameDecoder
- 见 io.netty.handler.codec.LengthFieldBasedFrameDecoder 源码
- >* lengthFieldOffset = 0
>* lengthFieldLength = 2
>* <b>lengthAdjustment</b> = <b>-2</b> (= the length of the Length field)
>* initialBytesToStrip = 0
>*
>* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
>* +--------+----------------+ +--------+----------------+
>* | Length | Actual Content |----->| Length | Actual Content |
>* | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
>* +--------+----------------+ +--------+----------------+
>
- 一定要进行设置,这样才会跳过 Length field 的长度
- 避免使用 LengthFieldBasedFrameDecoder(...) 最简单的构造器, 因为最简单的构造器 initialBytesToStrip 是 0
- ChannelHandler 顺序不正确
- 如何保证顺序正确:
- 先请求后响应, 包 -> 协议 -> 协议 -> 包
- 业务的Handler 顺序都可以 , 可以放在最后写
- ChannelHandler 该共享不共享, 不该共享却共享,触发多线程问题
- LoggingHandler 就是要共享的, 因为Channel 都有自己的 pipeline , 不共享就是浪费空间, 问题也不大
- PS: 总结有哪些需要共享的 Handler, 哪些不需要共享的 Handler
- 分配 ByteBuf: 分配器直接用 ByteBufAllocator.DEFAULT 等, 而不是采用 ChannelHandlerContext.alloc()
- 见 com.baiye.case5.server.codec.OrderProtocolEncoder
- ctx 中可以获取 alloc, 不要自己去创建, 因为当源码重新创建了一个 alloc 的实现,会有可能造成实现的不一致
- 未考虑 ByteBuf 的释放
- 继承 SimpleChannelInboundHandler 可以帮我们进行释放资源, 如果使用 ChannelInboundHandlerAdapter 就要自己进行释放 ReferenceCountUtil.release(msg); , 不接收消息也要进行释放 fireChannelRead()
- 错以为 ChannelHandlerContext.writer(msg) 就写出数据了
- writer 仅仅是将消息加到队列中, 不是进行真正的发送
- 乱用 ChannelHandlerContext.channel().writerAndFlush(msg)
- ctx.writeAndFlush(responseMessage); : 是在当前 pipelineHandler 的位置寻找下一个符合条件的 Handler,所以并不是把 pipeline 重新走了一遍
- ctx.channel().writerAndFlush(msg); : 是调用的 pipeline.writeAndFlush(msg); , 表示 pipeline 重新走了一遍, 如果是中间的 Handler 就会造成死循环
- ctx.channel().writerAndFlush(msg); 常用在客户端, ctx.writeAndFlush(responseMessage); 用在服务端
## 5. 实战及调优
### 5.1 调优参数: 调整 System 参数
- Linux 系统参数
- ![调整System参数](pic/调整System参数.png)
- 1: 查看 云服务器 支持不支持这种帮助你进行一键调优的工具或者配置
```shell
/proc/sys/net/ipv4/tcp_keepalive_time
```
---
- Netty 支持的系统参数
- Netty 支持的系统参数 ChannelOption.XXX 讨论
- 不考虑 UDP
- IP_MULTICAST_TTL
- 不考虑 OIO 编程
- ChannelOption<Integer>SO_TIMEOUT=("SO_TIMEOUT"); - 控制阻塞时间
- 参数列表
- ![Netty支持的调优参数表](pic/Netty支持的调优参数表.png)
- SO_SNDBUF 和 SO_RCVBUF 参数现在因为Linux是动态的, 所以不用调整
- SO_KEEPALIVE 我们用应用层控制, 所以这里保持默认关闭
- SO_REUSEADDR
- SO_LINGER
- IP_TOS
- TCP_NODELAY 这个要设置为 true - 小报文比较多的情况下
- ![Netty支持的调优参数表1](pic/Netty支持的调优参数表1.png)
- SO_RCVBUF
- SO_REUSEADDR
- SO_BACKLOG
- ~~IP_TOS~~
- 参数调优要点 - 权衡 Netty 核心参数
- option/childOption 要分清, 不会报错, 也不会生效
- 不懂不要动, 避免过早优化
- 可配置(动态配置更好)
- 需要调整的参数
- 最大打开文件数
- TCP_NODELAY SO_BACKLOG SO_REUSEADDR (酌情处理)
- ![Netty支持的调优参数表2](pic/Netty支持的调优参数表2.png)
- ![Netty支持的调优参数表3](pic/Netty支持的调优参数表3.png)
- 第一个参数值, 不大是因为这是每个连接的范围值, 其实很大了
- 两个兄弟的关系
- ![Netty支持的调优参数表4](pic/Netty支持的调优参数表4.png)
- 功能上可以细分为3类
- ![Netty支持的调优参数表5](pic/Netty支持的调优参数表5.png)
- ![Netty支持的调优参数表6](pic/Netty支持的调优参数表6.png)
- ![Netty支持的调优参数表7](pic/Netty支持的调优参数表7.png)
- 服务端调优 com.baiye.case5.server.ServerV1
- 客户端调优 com.baiye.case5.client.OrderClientV4
```shell
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
```
- SocketChannel -> .childOption
- ServerSocketChannel -> .option

@ -18,11 +18,30 @@
</properties>
<dependencies>
<!-- Netty 依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.28.Final</version>
</dependency>
<!-- 工具类 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.22</version>
</dependency>
</dependencies>
</project>

@ -0,0 +1,73 @@
package com.baiye.case5.client;
import com.baiye.case5.client.codec.ClientProtocolDecoder;
import com.baiye.case5.client.codec.ClientProtocolEncoder;
import com.baiye.case5.client.codec.OrderFrameDecoder;
import com.baiye.case5.client.codec.OrderFrameEncoder;
import com.baiye.case5.client.handler.ClientHandler;
import com.baiye.case5.common.RequestMessage;
import com.baiye.case5.common.order.OrderOperation;
import com.baiye.case5.util.IdUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* v1 -
*/
public class OrderClientV0 {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
final ClientHandler clientHandler = new ClientHandler();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);
try {
bootstrap.group(boss);
bootstrap.channel(NioSocketChannel.class);
// bootstrap.option(NioChannelOption.TCP_NODELAY, true);
// 客户端连接服务端超时设置
// bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 10 * 1000);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(loggingHandler);
p.addLast(new OrderFrameDecoder());
p.addLast(new OrderFrameEncoder());
p.addLast(new ClientProtocolDecoder());
p.addLast(new ClientProtocolEncoder());
// ... 这里是考虑客户端不需要业务处理的 handler
p.addLast(clientHandler);
}
});
// 建立连接 - sync
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090).sync();
// 构建消息体
OrderOperation orderOperation = new OrderOperation(1, "fish");
// 生成Id
long id = IdUtil.nextId();
// 发送消息
channelFuture.channel().writeAndFlush(new RequestMessage(id, orderOperation));
channelFuture.channel().closeFuture().get();
}catch (Exception e){
e.printStackTrace();
}finally {
boss.shutdownGracefully();
}
}
}

@ -0,0 +1,78 @@
package com.baiye.case5.client;
import com.baiye.case5.client.codec.*;
import com.baiye.case5.client.handler.ClientHandler;
import com.baiye.case5.client.handler.dispatcher.ResponseMessagePendingCenter;
import com.baiye.case5.common.order.OrderOperation;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* OperationToRequestMessageEncoder
*
* @author q
*/
public class OrderClientV1 {
public static void main(String[] args) {
final ClientHandler clientHandler = new ClientHandler();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);
final ResponseMessagePendingCenter center = new ResponseMessagePendingCenter();
NioEventLoopGroup boss = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(boss);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 编解码
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new ClientProtocolDecoder());
pipeline.addLast(new ClientProtocolEncoder());
// 请求内容 Encode
pipeline.addLast(new OperationToRequestMessageEncoder());
// Netty 日志 - handler
pipeline.addLast(loggingHandler);
// pipeline.addLast(new ResponseMessagePromiseHandler(center));
// pipeline.addLast(clientHandler);
}
});
// 建立连接 - sync
ChannelFuture ch = bootstrap.connect("127.0.0.1", 8090).sync();
// 因为上面封装了 OperationToRequestMessageEncoder - 这里直接可以用 OrderOperation 替代下面的 fix
// fixme
/* RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1002, "ji"));
ch.channel().writeAndFlush(requestMessage);*/
OrderOperation orderOperation = new OrderOperation(1001, "tudou");
// 发送消息
ch.channel().writeAndFlush(orderOperation);
// long id = IdUtil.nextId();
// ResponseMessagePromise promise = center.addPromise(id);
// ch.channel().writeAndFlush(new RequestMessage(id, orderOperation));
// ResponseMessage responseMessage = promise.get();
// System.out.println("阻塞等待回复:" + responseMessage);
ch.channel().closeFuture().get();
}catch (Exception e){
e.printStackTrace();
}finally {
boss.shutdownGracefully();
}
}
}

@ -0,0 +1,87 @@
package com.baiye.case5.client;
import com.baiye.case5.client.codec.ClientProtocolDecoder;
import com.baiye.case5.client.codec.ClientProtocolEncoder;
import com.baiye.case5.client.codec.OrderFrameDecoder;
import com.baiye.case5.client.codec.OrderFrameEncoder;
import com.baiye.case5.client.handler.ClientHandler;
import com.baiye.case5.client.handler.dispatcher.ResponseMessagePendingCenter;
import com.baiye.case5.client.handler.dispatcher.ResponseMessagePromise;
import com.baiye.case5.client.handler.dispatcher.ResponseMessagePromiseHandler;
import com.baiye.case5.common.RequestMessage;
import com.baiye.case5.common.ResponseMessage;
import com.baiye.case5.common.order.OrderOperation;
import com.baiye.case5.util.IdUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.net.InetSocketAddress;
/**
* @author lianghuarong
* @date 2021/6/16 15:15
*/
public class OrderClientV2 {
public static void main(String[] args) {
final ClientHandler clientHandler = new ClientHandler();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);
final ResponseMessagePendingCenter center = new ResponseMessagePendingCenter();
NioEventLoopGroup boss = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
try {
b.group(boss);
b.channel(NioSocketChannel.class);
// 关闭后是否更早关闭释放端口win下查看端口占用netstat -aon|findstr "12979"(实际的端口)
b.option(NioChannelOption.SO_REUSEADDR, true);
b.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(loggingHandler);
p.addLast(new OrderFrameDecoder());
p.addLast(new OrderFrameEncoder());
p.addLast(new ClientProtocolDecoder());
p.addLast(new ClientProtocolEncoder());
p.addLast(new ResponseMessagePromiseHandler(center));
p.addLast(clientHandler);
}
});
ChannelFuture ch = b.connect("127.0.0.1", 8090).sync();
OrderOperation orderOperation = new OrderOperation(1, "fish");
long id = IdUtil.nextId();
ResponseMessagePromise promise = center.addPromise(id);
ch.channel().writeAndFlush(new RequestMessage(id, orderOperation));
ResponseMessage responseMessage = promise.get();
System.out.println("阻塞等待回复:" + responseMessage);
// 获取本地端口
InetSocketAddress inetSocketAddress = (InetSocketAddress)ch.channel().localAddress();
int port = inetSocketAddress.getPort();
System.out.println(port);
// 关闭,测试
ch.channel().close();
}catch (Exception e){
e.printStackTrace();
}finally {
boss.shutdownGracefully();
}
}
}

@ -0,0 +1,78 @@
package com.baiye.case5.client;
import com.baiye.case5.client.codec.*;
import com.baiye.case5.client.handler.dispatcher.OperationResultFuture;
import com.baiye.case5.client.handler.dispatcher.RequestPendingCenter;
import com.baiye.case5.client.handler.dispatcher.ResponseDispatcherHandler;
import com.baiye.case5.common.OperationResult;
import com.baiye.case5.common.order.OrderOperation;
import com.baiye.case5.util.IdUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* future -> streamId
*/
public class OrderClientV3 {
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
try {
bootstrap.group(nioEventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// 编解码
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new ClientProtocolDecoder());
pipeline.addLast(new ClientProtocolEncoder());
// future 对应的 handler
pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
// 请求内容 Encode
pipeline.addLast(new OperationToRequestMessageEncoder());
// Netty 日志 - handler
pipeline.addLast(loggingHandler);
}
});
// 建立连接 - sync
ChannelFuture ch = bootstrap.connect("127.0.0.1", 8090).sync();
// 构建消息体
OrderOperation orderOperation = new OrderOperation(1001, "tudou");
// 生成 streamId
long streamId = IdUtil.nextId();
// 构建请求对应的 future
OperationResultFuture operationResultFuture = new OperationResultFuture();
// 发送消息之前把 streamId + future 放入 center 中
requestPendingCenter.add(streamId, operationResultFuture);
// 发送消息
// TODO: 2022/8/23 当发送消息失败的时候 future 也应该处理
ch.channel().writeAndFlush(orderOperation);
// 使用 future 来等待请求的结果
OperationResult operationResult = operationResultFuture.get();
// channel 关闭
ch.channel().closeFuture().get();
}catch (Exception e){
e.printStackTrace();
}finally {
nioEventLoopGroup.shutdownGracefully();
}
}
}

@ -0,0 +1,84 @@
package com.baiye.case5.client;
import com.baiye.case5.client.codec.*;
import com.baiye.case5.client.handler.dispatcher.OperationResultFuture;
import com.baiye.case5.client.handler.dispatcher.RequestPendingCenter;
import com.baiye.case5.client.handler.dispatcher.ResponseDispatcherHandler;
import com.baiye.case5.common.OperationResult;
import com.baiye.case5.common.order.OrderOperation;
import com.baiye.case5.util.IdUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
*
*
* @author q
*/
public class OrderClientV4 {
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
try {
bootstrap.group(nioEventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
// 参数调优1 超时 10s
bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 10 * 1000);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// 编解码
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new ClientProtocolDecoder());
pipeline.addLast(new ClientProtocolEncoder());
// future 对应的 handler
pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
// 请求内容 Encode
pipeline.addLast(new OperationToRequestMessageEncoder());
// Netty 日志 - handler
pipeline.addLast(loggingHandler);
}
});
// 建立连接 - sync
ChannelFuture ch = bootstrap.connect("127.0.0.1", 8090).sync();
// 构建消息体
OrderOperation orderOperation = new OrderOperation(1001, "tudou");
// 生成 streamId
long streamId = IdUtil.nextId();
// 构建请求对应的 future
OperationResultFuture operationResultFuture = new OperationResultFuture();
// 发送消息之前把 streamId + future 放入 center 中
requestPendingCenter.add(streamId, operationResultFuture);
// 发送消息
// TODO: 2022/8/23 当发送消息失败的时候 future 也应该处理
ch.channel().writeAndFlush(orderOperation);
// 使用 future 来等待请求的结果
OperationResult operationResult = operationResultFuture.get();
// channel 关闭
ch.channel().closeFuture().get();
}catch (Exception e){
e.printStackTrace();
}finally {
nioEventLoopGroup.shutdownGracefully();
}
}
}

@ -0,0 +1,39 @@
package com.baiye.case5.client.codec;
import com.baiye.case5.common.ResponseMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
/**
* 4. ByteBuf -> msg
*/
public class ClientProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// byte[] res = new byte[in.readableBytes()];
// in.copy().readBytes(res);
//// System.out.println(" in: " + new String(Base64.getEncoder().encode(res)));
// System.out.println(" in: " + bytesToHex(res));
ResponseMessage responseMessage = new ResponseMessage();
responseMessage.decode(in);
out.add(responseMessage);
}
public static String bytesToHex(byte[] bytes) {
StringBuffer sb = new StringBuffer();
for(int i = 0; i < bytes.length; i++) {
String hex = Integer.toHexString(bytes[i] & 0xFF);
if(hex.length() < 2){
sb.append(0);
}
sb.append(hex);
}
return sb.toString();
}
}

@ -0,0 +1,23 @@
package com.baiye.case5.client.codec;
import com.baiye.case5.common.RequestMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
/**
* 1. RequestMessage ->
*/
public class ClientProtocolEncoder extends MessageToMessageEncoder<RequestMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, RequestMessage msg, List<Object> out) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
msg.encode(buffer);
out.add(buffer);
}
}

@ -0,0 +1,19 @@
package com.baiye.case5.client.codec;
import com.baiye.case5.common.Operation;
import com.baiye.case5.common.RequestMessage;
import com.baiye.case5.util.IdUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
public class OperationToRequestMessageEncoder extends MessageToMessageEncoder<Operation> {
@Override
protected void encode(ChannelHandlerContext ctx, Operation operation, List<Object> out) throws Exception {
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), operation);
out.add(requestMessage);
}
}

@ -0,0 +1,27 @@
package com.baiye.case5.client.codec;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/**
* 3. -> TCP []
*/
public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
/**
*
*
*
*
*
*
*/
public OrderFrameDecoder() {
super(
Integer.MAX_VALUE,
0,
2,
0,
2
);
}
}

@ -0,0 +1,13 @@
package com.baiye.case5.client.codec;
import io.netty.handler.codec.LengthFieldPrepender;
/**
* 2.
*/
public class OrderFrameEncoder extends LengthFieldPrepender {
public OrderFrameEncoder() {
super(2);
}
}

@ -0,0 +1,20 @@
package com.baiye.case5.client.handler;
import com.baiye.case5.common.ResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @author lianghuarong
* @date 2021/6/16 15:08
*/
@ChannelHandler.Sharable
public class ClientHandler extends SimpleChannelInboundHandler<ResponseMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage msg) throws Exception {
System.out.println("收到响应:" + msg);
}
}

@ -0,0 +1,8 @@
package com.baiye.case5.client.handler.dispatcher;
import com.baiye.case5.common.OperationResult;
import io.netty.util.concurrent.DefaultPromise;
public class OperationResultFuture extends DefaultPromise<OperationResult> {
}

@ -0,0 +1,45 @@
package com.baiye.case5.client.handler.dispatcher;
import com.baiye.case5.common.OperationResult;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* streamId -> Future
*
* @author q
*/
public class RequestPendingCenter {
private Map<Long, OperationResultFuture> map = new ConcurrentHashMap<>();
/**
* streamId future
*
* @param streamId id
* @param future future
*/
public void add(Long streamId, OperationResultFuture future){
this.map.put(streamId, future);
}
/**
* streamId future
*
* @param streamId id
* @param operationResult
*/
public void set(Long streamId, OperationResult operationResult){
OperationResultFuture operationResultFuture = this.map.get(streamId);
if (operationResultFuture != null) {
// 找到, 设置为成功
operationResultFuture.setSuccess(operationResult);
// 找到结果, 把整个结果进行移除, map容量控制
this.map.remove(streamId);
}
}
}

@ -0,0 +1,35 @@
package com.baiye.case5.client.handler.dispatcher;
import com.baiye.case5.common.ResponseMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* Center
*
* @author q
*/
public class ResponseDispatcherHandler extends SimpleChannelInboundHandler<ResponseMessage> {
/**
* requestPendingCenter
*/
private RequestPendingCenter requestPendingCenter;
/**
*
*/
public ResponseDispatcherHandler(RequestPendingCenter requestPendingCenter) {
this.requestPendingCenter = requestPendingCenter;
}
/**
* future
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage responseMessage) throws Exception {
// streamId(header) + result(body)
requestPendingCenter.set(responseMessage.getMessageHeader().getStreamId(), responseMessage.getMessageBody());
}
}

@ -0,0 +1,34 @@
package com.baiye.case5.client.handler.dispatcher;
import com.baiye.case5.common.ResponseMessage;
import io.netty.util.concurrent.DefaultPromise;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author lianghuarong
* @date 2021/6/17 14:09
*/
public class ResponseMessagePendingCenter {
private Map<Long, ResponseMessagePromise> map = new ConcurrentHashMap<>();
public void setSuccess(long streamId, ResponseMessage responseMessage){
if (map.containsKey(streamId)){
DefaultPromise promise = map.get(streamId);
promise.setSuccess(responseMessage);
map.remove(streamId);
}
}
public ResponseMessagePromise addPromise(long streamId){
if (map.containsKey(streamId)){
return map.get(streamId);
}
ResponseMessagePromise p = new ResponseMessagePromise();
map.put(streamId, p);
return p;
}
}

@ -0,0 +1,11 @@
package com.baiye.case5.client.handler.dispatcher;
import com.baiye.case5.common.ResponseMessage;
import io.netty.util.concurrent.DefaultPromise;
/**
* @author lianghuarong
* @date 2021/6/17 14:04
*/
public class ResponseMessagePromise extends DefaultPromise<ResponseMessage> {
}

@ -0,0 +1,25 @@
package com.baiye.case5.client.handler.dispatcher;
import com.baiye.case5.common.ResponseMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @author lianghuarong
* @date 2021/6/17 14:15
*/
public class ResponseMessagePromiseHandler extends SimpleChannelInboundHandler<ResponseMessage> {
private ResponseMessagePendingCenter pendingCenter;
public ResponseMessagePromiseHandler(ResponseMessagePendingCenter pendingCenter){
this.pendingCenter = pendingCenter;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage msg) throws Exception {
pendingCenter.setSuccess(msg.getMessageHeader().getStreamId(), msg);
ctx.fireChannelRead(msg);
}
}

@ -0,0 +1,63 @@
package com.baiye.case5.common;
import com.baiye.case5.util.JsonUtil;
import io.netty.buffer.ByteBuf;
import lombok.Data;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import static cn.hutool.core.util.CharsetUtil.UTF_8;
/**
*
* @param <T>
*
*/
@Data
public abstract class Message<T extends MessageBody> {
private MessageHeader messageHeader;
private T messageBody;
public T getMessageBody() {
return messageBody;
}
/**
*
*
* @param byteBuf
*/
public void encode(ByteBuf byteBuf) {
byteBuf.writeInt(messageHeader.getVersion());
byteBuf.writeLong(messageHeader.getStreamId());
byteBuf.writeInt(messageHeader.getOpCode());
byteBuf.writeBytes(JsonUtil.toJson(messageBody).getBytes(StandardCharsets.UTF_8));
}
public abstract Class<T> getMessageBodyDecodeClass(int opCode);
/**
*
*
* @param msg
*/
public void decode(ByteBuf msg) {
int version = msg.readInt();
long streamId = msg.readLong();
int opCode = msg.readInt();
MessageHeader messageHeader = new MessageHeader();
messageHeader.setVersion(version);
messageHeader.setStreamId(streamId);
messageHeader.setOpCode(opCode);
this.messageHeader = messageHeader;
Class<T> bodyClazz = getMessageBodyDecodeClass(opCode);
T body = JsonUtil.fromJson(msg.toString(StandardCharsets.UTF_8), bodyClazz);
this.messageBody = body;
}
}

@ -0,0 +1,7 @@
package com.baiye.case5.common;
/**
* MessageBody
*/
public abstract class MessageBody {
}

@ -0,0 +1,16 @@
package com.baiye.case5.common;
import lombok.Data;
@Data
public class MessageHeader {
/**
* 1
*/
private int version = 1;
private long streamId;
private int opCode;
}

@ -0,0 +1,6 @@
package com.baiye.case5.common;
public abstract class Operation extends MessageBody{
public abstract OperationResult execute();
}

@ -0,0 +1,7 @@
package com.baiye.case5.common;
import lombok.Data;
@Data
public abstract class OperationResult extends MessageBody{
}

@ -0,0 +1,74 @@
package com.baiye.case5.common;
import com.baiye.case5.common.auth.AuthOperation;
import com.baiye.case5.common.auth.AuthOperationResult;
import com.baiye.case5.common.keepalive.KeepaliveOperation;
import com.baiye.case5.common.keepalive.KeepaliveOperationResult;
import com.baiye.case5.common.order.OrderOperation;
import com.baiye.case5.common.order.OrderOperationResult;
import java.util.function.Predicate;
public enum OperationType {
/**
*
*/
AUTH(1, AuthOperation.class, AuthOperationResult.class),
/**
*
*/
KEEPALIVE(2, KeepaliveOperation.class, KeepaliveOperationResult.class),
/**
*
*/
ORDER(3, OrderOperation.class, OrderOperationResult.class)
;
private int opCode;
private Class<? extends Operation> operationClazz;
private Class<? extends OperationResult> operationResultClazz;
OperationType(int opCode, Class<? extends Operation> operationClazz, Class<? extends OperationResult> operationResultClazz) {
this.opCode = opCode;
this.operationClazz = operationClazz;
this.operationResultClazz = operationResultClazz;
}
public Class<? extends Operation> getOperationClazz(){
return operationClazz;
}
public int getOpCode() {
return opCode;
}
public Class<? extends OperationResult> getOperationResultClazz() {
return operationResultClazz;
}
public static OperationType fromOpCode(int type){
return getOperationType(requestType -> requestType.opCode == type);
}
public static OperationType fromOperation(Operation operation){
return getOperationType(requestType -> requestType.operationClazz == operation.getClass());
}
public static OperationType fromOperationResult(OperationResult operationResult){
return getOperationType(requestType -> requestType.operationResultClazz == operationResult.getClass());
}
private static OperationType getOperationType(Predicate<OperationType> predicate){
OperationType[] values = values();
for (OperationType operationType : values) {
if(predicate.test(operationType)){
return operationType;
}
}
throw new AssertionError("no found type");
}
}

@ -0,0 +1,20 @@
package com.baiye.case5.common;
import java.util.Objects;
public class RequestMessage extends Message<Operation>{
@Override
public Class getMessageBodyDecodeClass(int opcode) {
return OperationType.fromOpCode(opcode).getOperationClazz();
}
public RequestMessage(){}
public RequestMessage(Long streamId, Operation operation){
MessageHeader messageHeader = new MessageHeader();
messageHeader.setStreamId(streamId);
messageHeader.setOpCode(OperationType.fromOperation(operation).getOpCode());
this.setMessageHeader(messageHeader);
this.setMessageBody(operation);
}
}

@ -0,0 +1,18 @@
package com.baiye.case5.common;
public class ResponseMessage extends Message <OperationResult>{
@Override
public Class getMessageBodyDecodeClass(int opcode) {
return OperationType.fromOpCode(opcode).getOperationResultClazz();
}
public ResponseMessage(){}
public ResponseMessage(Long streamId, OperationResult operationResult){
MessageHeader messageHeader = new MessageHeader();
messageHeader.setStreamId(streamId);
messageHeader.setOpCode(OperationType.fromOperationResult(operationResult).getOpCode());
this.setMessageHeader(messageHeader);
this.setMessageBody(operationResult);
}
}

@ -0,0 +1,24 @@
package com.baiye.case5.common.auth;
import com.baiye.case5.common.Operation;
import lombok.Data;
import lombok.extern.java.Log;
@Data
@Log
public class AuthOperation extends Operation {
private final String userName;
private final String password;
@Override
public AuthOperationResult execute() {
if("admin".equalsIgnoreCase(this.userName)){
AuthOperationResult orderResponse = new AuthOperationResult(true);
return orderResponse;
}
return new AuthOperationResult(false);
}
}

@ -0,0 +1,11 @@
package com.baiye.case5.common.auth;
import com.baiye.case5.common.OperationResult;
import lombok.Data;
@Data
public class AuthOperationResult extends OperationResult {
private final boolean passAuth;
}

@ -0,0 +1,23 @@
package com.baiye.case5.common.keepalive;
import com.baiye.case5.common.Operation;
import lombok.Data;
import lombok.extern.java.Log;
@Data
@Log
public class KeepaliveOperation extends Operation {
private long time ;
public KeepaliveOperation() {
this.time = System.nanoTime();
}
@Override
public KeepaliveOperationResult execute() {
KeepaliveOperationResult orderResponse = new KeepaliveOperationResult(time);
return orderResponse;
}
}

@ -0,0 +1,11 @@
package com.baiye.case5.common.keepalive;
import com.baiye.case5.common.OperationResult;
import lombok.Data;
@Data
public class KeepaliveOperationResult extends OperationResult {
private final long time;
}

@ -0,0 +1,26 @@
package com.baiye.case5.common.order;
import com.baiye.case5.common.Operation;
import lombok.Data;
@Data
public class OrderOperation extends Operation {
private int tableId;
private String dish;
public OrderOperation(int tableId, String dish) {
this.tableId = tableId;
this.dish = dish;
}
@Override
public OrderOperationResult execute() {
System.out.println("order's executing startup with orderRequest: " + toString());
//execute order logic
System.out.println("order's executing complete");
OrderOperationResult orderResponse = new OrderOperationResult(tableId, dish, true);
return orderResponse;
}
}

@ -0,0 +1,13 @@
package com.baiye.case5.common.order;
import com.baiye.case5.common.OperationResult;
import lombok.Data;
@Data
public class OrderOperationResult extends OperationResult {
private final int tableId;
private final String dish;
private final boolean complete;
}

@ -0,0 +1,63 @@
package com.baiye.case5.server;
import com.baiye.case5.server.codec.OrderFrameDecoder;
import com.baiye.case5.server.codec.OrderFrameEncoder;
import com.baiye.case5.server.codec.OrderProtocolDecoder;
import com.baiye.case5.server.codec.OrderProtocolEncoder;
import com.baiye.case5.server.handler.OrderServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.util.concurrent.ExecutionException;
/**
*
*/
public class Server {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置 I/O 模式
serverBootstrap.channel(NioServerSocketChannel.class);
// 设置 日志级别
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
// 设置 Reactor 方式
NioEventLoopGroup group = new NioEventLoopGroup();
try{
serverBootstrap.group(group);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new OrderProtocolDecoder());
// 设置 日志级别
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new OrderServerProcessHandler());
}
});
// 绑定端口启动
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}

@ -0,0 +1,71 @@
package com.baiye.case5.server;
import com.baiye.case5.server.codec.OrderFrameDecoder;
import com.baiye.case5.server.codec.OrderFrameEncoder;
import com.baiye.case5.server.codec.OrderProtocolDecoder;
import com.baiye.case5.server.codec.OrderProtocolEncoder;
import com.baiye.case5.server.handler.OrderServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.util.concurrent.ExecutionException;
/**
* -
*
* @author q
*/
public class ServerV1 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置 I/O 模式
serverBootstrap.channel(NioServerSocketChannel.class);
// 设置 日志级别
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
// 设置 Reactor 方式
NioEventLoopGroup group = new NioEventLoopGroup();
try{
serverBootstrap.group(group);
// 参数调优1 - 必调
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
// 参数调优2 - 必调
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new OrderProtocolDecoder());
// 设置 日志级别
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new OrderServerProcessHandler());
}
});
// 绑定端口启动
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}

@ -0,0 +1,27 @@
package com.baiye.case5.server.codec;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/**
* 1. -> TCP []
*/
public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
/**
*
*
*
*
*
*
*/
public OrderFrameDecoder() {
super(
Integer.MAX_VALUE,
0,
2,
0,
2
);
}
}

@ -0,0 +1,13 @@
package com.baiye.case5.server.codec;
import io.netty.handler.codec.LengthFieldPrepender;
/**
* 5.
*/
public class OrderFrameEncoder extends LengthFieldPrepender {
public OrderFrameEncoder() {
super(2);
}
}

@ -0,0 +1,22 @@
package com.baiye.case5.server.codec;
import com.baiye.case5.common.RequestMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
/**
* 2. Decoder ByteBuf -> RequestMessage
*
*/
public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> out) throws Exception {
RequestMessage requestMessage = new RequestMessage();
requestMessage.decode(byteBuf);
out.add(requestMessage);
}
}

@ -0,0 +1,22 @@
package com.baiye.case5.server.codec;
import java.util.List;
import com.baiye.case5.common.ResponseMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
/**
* 4. Handler writeAndFlush ResponseMessage -> ByteBuf
*/
public class OrderProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, ResponseMessage responseMessage, List<Object> out) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
responseMessage.encode(buffer);
out.add(buffer);
}
}

@ -0,0 +1,28 @@
package com.baiye.case5.server.handler;
import com.baiye.case5.common.Operation;
import com.baiye.case5.common.OperationResult;
import com.baiye.case5.common.RequestMessage;
import com.baiye.case5.common.ResponseMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 3. Handler
*/
public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RequestMessage requestMessage) throws Exception {
Operation operation = requestMessage.getMessageBody();
OperationResult operationResult = operation.execute();
// 发送结果返回回去
ResponseMessage responseMessage = new ResponseMessage();
// 头信息原样返回
responseMessage.setMessageHeader(requestMessage.getMessageHeader());
responseMessage.setMessageBody(operationResult);
// 信息写出去
channelHandlerContext.writeAndFlush(responseMessage);
}
}

@ -0,0 +1,19 @@
package com.baiye.case5.util;
import java.util.concurrent.atomic.AtomicLong;
public final class IdUtil {
public static final AtomicLong IDX = new AtomicLong();
private IdUtil() {
}
/**
* streamId
* @return streamId
*/
public static long nextId(){
return IDX.incrementAndGet();
}
}

@ -0,0 +1,25 @@
package com.baiye.case5.util;
import com.google.gson.Gson;
public final class JsonUtil {
private static final Gson GSON = new Gson();
private JsonUtil() {
}
/**
*
*/
public static <T> T fromJson(String jsonStr, Class<T> clazz){
return GSON.fromJson(jsonStr, clazz);
}
/**
*
*/
public static String toJson(Object obj) {
return GSON.toJson(obj);
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 484 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 261 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 662 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 341 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 103 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 413 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 143 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 136 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 267 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 222 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 157 KiB

Loading…
Cancel
Save