@ -0,0 +1,11 @@
|
||||
package com.baiye.case5.client.handler;
|
||||
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
|
||||
public class ClientIdleCheckHandler extends IdleStateHandler {
|
||||
|
||||
public ClientIdleCheckHandler() {
|
||||
super(0, 5, 0);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
package com.baiye.case5.client.handler;
|
||||
|
||||
import com.baiye.case5.common.RequestMessage;
|
||||
import com.baiye.case5.common.keepalive.KeepaliveOperation;
|
||||
import com.baiye.case5.util.IdUtil;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@ChannelHandler.Sharable
|
||||
public class KeepaliveHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
|
||||
log.info("write idle happen. so need to send keepalive to keep connection not closed by server");
|
||||
KeepaliveOperation keepaliveOperation = new KeepaliveOperation();
|
||||
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), keepaliveOperation);
|
||||
ctx.writeAndFlush(requestMessage);
|
||||
}
|
||||
super.userEventTriggered(ctx, evt);
|
||||
}
|
||||
}
|
@ -0,0 +1,92 @@
|
||||
package com.baiye.case5.server;
|
||||
|
||||
import com.baiye.case5.server.codec.OrderFrameDecoder;
|
||||
import com.baiye.case5.server.codec.OrderFrameEncoder;
|
||||
import com.baiye.case5.server.codec.OrderProtocolDecoder;
|
||||
import com.baiye.case5.server.codec.OrderProtocolEncoder;
|
||||
import com.baiye.case5.server.handler.MetricsHandler;
|
||||
import com.baiye.case5.server.handler.OrderServerProcessHandler;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioChannelOption;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
* 服务端 - 参数调优版本 - v2 异步 handler
|
||||
*
|
||||
* @author q
|
||||
*/
|
||||
public class ServerV2 {
|
||||
|
||||
public static void main(String[] args) throws InterruptedException, ExecutionException {
|
||||
|
||||
ServerBootstrap serverBootstrap = new ServerBootstrap();
|
||||
// 设置 I/O 模式
|
||||
serverBootstrap.channel(NioServerSocketChannel.class);
|
||||
// 设置 日志级别
|
||||
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
|
||||
// 设置 Reactor 方式 - 规范需要指定线程名称
|
||||
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
|
||||
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
|
||||
|
||||
// 整个程序共享的业务线程池 - 切记不能使用 NioEventLoopGroup, 这样只能使用其中的一个线程
|
||||
UnorderedThreadPoolEventExecutor orderServerProcessThreadPool = new UnorderedThreadPoolEventExecutor(
|
||||
10,
|
||||
new DefaultThreadFactory("OrderServerProcessThreadPool")
|
||||
);
|
||||
|
||||
|
||||
// 共享的 Handler
|
||||
MetricsHandler metricsHandler = new MetricsHandler();
|
||||
try {
|
||||
serverBootstrap.group(boss, worker);
|
||||
// 参数调优1 - 必调
|
||||
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
|
||||
// 参数调优2 - 必调
|
||||
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
|
||||
|
||||
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(NioSocketChannel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
// 调试的时候使用的
|
||||
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
|
||||
// Handler - 规范一定要指定线程名称
|
||||
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
|
||||
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
|
||||
|
||||
pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
|
||||
pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
|
||||
// 设置 日志级别
|
||||
pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
|
||||
// 业务处理 Handler
|
||||
// pipeline.addLast("OrderServerProcessHandler", new OrderServerProcessHandler());
|
||||
// FIXME: 使用独立的线程池进行异步执行逻辑
|
||||
pipeline.addLast(orderServerProcessThreadPool, new OrderServerProcessHandler());
|
||||
// 连接数监测 Handler
|
||||
pipeline.addLast("MetricsHandler", metricsHandler);
|
||||
}
|
||||
});
|
||||
// 绑定端口启动
|
||||
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
|
||||
|
||||
channelFuture.channel().closeFuture().sync();
|
||||
} finally {
|
||||
boss.shutdownGracefully();
|
||||
worker.shutdownGracefully();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
package com.baiye.case5.server;
|
||||
|
||||
import com.baiye.case5.server.codec.OrderFrameDecoder;
|
||||
import com.baiye.case5.server.codec.OrderFrameEncoder;
|
||||
import com.baiye.case5.server.codec.OrderProtocolDecoder;
|
||||
import com.baiye.case5.server.codec.OrderProtocolEncoder;
|
||||
import com.baiye.case5.server.handler.MetricsHandler;
|
||||
import com.baiye.case5.server.handler.OrderServerProcessHandler;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioChannelOption;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.flush.FlushConsolidationHandler;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
* 服务端 - 参数调优版本 - flush 优化
|
||||
*
|
||||
* @author q
|
||||
*/
|
||||
public class ServerV3 {
|
||||
|
||||
public static void main(String[] args) throws InterruptedException, ExecutionException {
|
||||
|
||||
ServerBootstrap serverBootstrap = new ServerBootstrap();
|
||||
// 设置 I/O 模式
|
||||
serverBootstrap.channel(NioServerSocketChannel.class);
|
||||
// 设置 日志级别
|
||||
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
|
||||
// 设置 Reactor 方式 - 规范需要指定线程名称
|
||||
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
|
||||
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
|
||||
|
||||
// 共享的 Handler
|
||||
MetricsHandler metricsHandler = new MetricsHandler();
|
||||
try {
|
||||
serverBootstrap.group(boss, worker);
|
||||
// 参数调优1 - 必调
|
||||
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
|
||||
// 参数调优2 - 必调
|
||||
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
|
||||
|
||||
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(NioSocketChannel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
// 调试的时候使用的
|
||||
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
|
||||
// Handler - 规范一定要指定线程名称
|
||||
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
|
||||
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
|
||||
|
||||
pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
|
||||
pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
|
||||
// 设置 日志级别
|
||||
pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
|
||||
// fixme flush 优化版本 - 非共享 handler
|
||||
pipeline.addLast("FlushOrderServerProcessHandler", new FlushConsolidationHandler(
|
||||
// 5次 write 才 flush一次
|
||||
5,
|
||||
// 异步增强打开
|
||||
true
|
||||
));
|
||||
// 连接数监测 Handler
|
||||
pipeline.addLast("MetricsHandler", metricsHandler);
|
||||
}
|
||||
});
|
||||
// 绑定端口启动
|
||||
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
|
||||
|
||||
channelFuture.channel().closeFuture().sync();
|
||||
} finally {
|
||||
boss.shutdownGracefully();
|
||||
worker.shutdownGracefully();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,91 @@
|
||||
package com.baiye.case5.server;
|
||||
|
||||
import com.baiye.case5.server.codec.OrderFrameDecoder;
|
||||
import com.baiye.case5.server.codec.OrderFrameEncoder;
|
||||
import com.baiye.case5.server.codec.OrderProtocolDecoder;
|
||||
import com.baiye.case5.server.codec.OrderProtocolEncoder;
|
||||
import com.baiye.case5.server.handler.MetricsHandler;
|
||||
import com.baiye.case5.server.handler.OrderServerProcessHandler;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioChannelOption;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
* 服务端 - 参数调优版本 - 流量整形
|
||||
*
|
||||
* @author q
|
||||
*/
|
||||
public class ServerV4 {
|
||||
|
||||
public static void main(String[] args) throws InterruptedException, ExecutionException {
|
||||
|
||||
ServerBootstrap serverBootstrap = new ServerBootstrap();
|
||||
// 设置 I/O 模式
|
||||
serverBootstrap.channel(NioServerSocketChannel.class);
|
||||
// 设置 日志级别
|
||||
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
|
||||
// 设置 Reactor 方式 - 规范需要指定线程名称
|
||||
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
|
||||
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
|
||||
|
||||
// 共享的 Handler
|
||||
MetricsHandler metricsHandler = new MetricsHandler();
|
||||
// FIXME: 流量整形 Handler 简易的线程池 + 100MB + 100MB
|
||||
GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(
|
||||
new NioEventLoopGroup(),
|
||||
100 * 1024 * 1024,
|
||||
100 * 1024 * 1024
|
||||
);
|
||||
try {
|
||||
serverBootstrap.group(boss, worker);
|
||||
// 参数调优1 - 必调
|
||||
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
|
||||
// 参数调优2 - 必调
|
||||
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
|
||||
|
||||
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(NioSocketChannel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
// 调试的时候使用的
|
||||
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
|
||||
// fixme 全局流控
|
||||
pipeline.addLast("GlobalTrafficShapingHandler", globalTrafficShapingHandler);
|
||||
// Handler - 规范一定要指定线程名称
|
||||
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
|
||||
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
|
||||
|
||||
pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
|
||||
pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
|
||||
// 设置 日志级别
|
||||
pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
|
||||
|
||||
pipeline.addLast("OrderServerProcessHandler", new OrderServerProcessHandler());
|
||||
// 连接数监测 Handler
|
||||
pipeline.addLast("MetricsHandler", metricsHandler);
|
||||
}
|
||||
});
|
||||
// 绑定端口启动
|
||||
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
|
||||
|
||||
channelFuture.channel().closeFuture().sync();
|
||||
} finally {
|
||||
boss.shutdownGracefully();
|
||||
worker.shutdownGracefully();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,86 @@
|
||||
package com.baiye.case5.server;
|
||||
|
||||
import com.baiye.case5.server.codec.OrderFrameDecoder;
|
||||
import com.baiye.case5.server.codec.OrderFrameEncoder;
|
||||
import com.baiye.case5.server.codec.OrderProtocolDecoder;
|
||||
import com.baiye.case5.server.codec.OrderProtocolEncoder;
|
||||
import com.baiye.case5.server.handler.MetricsHandler;
|
||||
import com.baiye.case5.server.handler.OrderServerProcessHandler;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.epoll.EpollChannelOption;
|
||||
import io.netty.channel.epoll.EpollEventLoopGroup;
|
||||
import io.netty.channel.epoll.EpollServerSocketChannel;
|
||||
import io.netty.channel.epoll.EpollSocketChannel;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
* 服务端 - 参数调优版本 - 开启 native
|
||||
*
|
||||
* "Nio" 替换为 "Epoll"
|
||||
*
|
||||
* 只能运行在 Linux 平台
|
||||
*
|
||||
* @author q
|
||||
*/
|
||||
public class ServerV5 {
|
||||
|
||||
public static void main(String[] args) throws InterruptedException, ExecutionException {
|
||||
|
||||
ServerBootstrap serverBootstrap = new ServerBootstrap();
|
||||
// 设置 I/O 模式
|
||||
serverBootstrap.channel(EpollServerSocketChannel.class);
|
||||
// 设置 日志级别
|
||||
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
|
||||
// 设置 Reactor 方式 - 规范需要指定线程名称
|
||||
EpollEventLoopGroup boss = new EpollEventLoopGroup(0, new DefaultThreadFactory("boss"));
|
||||
EpollEventLoopGroup worker = new EpollEventLoopGroup(0, new DefaultThreadFactory("worker"));
|
||||
|
||||
// 共享的 Handler
|
||||
MetricsHandler metricsHandler = new MetricsHandler();
|
||||
try {
|
||||
serverBootstrap.group(boss, worker);
|
||||
// 参数调优1 - 必调
|
||||
serverBootstrap.childOption(EpollChannelOption.TCP_NODELAY, true);
|
||||
// 参数调优2 - 必调
|
||||
serverBootstrap.option(EpollChannelOption.SO_BACKLOG, 1024);
|
||||
|
||||
serverBootstrap.childHandler(new ChannelInitializer<EpollSocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(EpollSocketChannel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
// 调试的时候使用的
|
||||
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
|
||||
// Handler - 规范一定要指定线程名称
|
||||
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
|
||||
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
|
||||
|
||||
pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
|
||||
pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
|
||||
// 设置 日志级别
|
||||
pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
|
||||
|
||||
pipeline.addLast("OrderServerProcessHandler", new OrderServerProcessHandler());
|
||||
// 连接数监测 Handler
|
||||
pipeline.addLast("MetricsHandler", metricsHandler);
|
||||
}
|
||||
});
|
||||
// 绑定端口启动
|
||||
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
|
||||
|
||||
channelFuture.channel().closeFuture().sync();
|
||||
} finally {
|
||||
boss.shutdownGracefully();
|
||||
worker.shutdownGracefully();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,85 @@
|
||||
package com.baiye.case5.server;
|
||||
|
||||
import com.baiye.case5.server.codec.OrderFrameDecoder;
|
||||
import com.baiye.case5.server.codec.OrderFrameEncoder;
|
||||
import com.baiye.case5.server.codec.OrderProtocolDecoder;
|
||||
import com.baiye.case5.server.codec.OrderProtocolEncoder;
|
||||
import com.baiye.case5.server.handler.MetricsHandler;
|
||||
import com.baiye.case5.server.handler.OrderServerProcessHandler;
|
||||
import com.baiye.case5.server.handler.ServerIdleCheckHandler;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioChannelOption;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
* 服务端 - 参数调优版本 - 空闲检测
|
||||
*
|
||||
* @author q
|
||||
*/
|
||||
public class ServerV6 {
|
||||
|
||||
public static void main(String[] args) throws InterruptedException, ExecutionException {
|
||||
|
||||
ServerBootstrap serverBootstrap = new ServerBootstrap();
|
||||
// 设置 I/O 模式
|
||||
serverBootstrap.channel(NioServerSocketChannel.class);
|
||||
// 设置 日志级别
|
||||
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
|
||||
// 设置 Reactor 方式 - 规范需要指定线程名称
|
||||
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
|
||||
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
|
||||
|
||||
// 共享的 Handler
|
||||
MetricsHandler metricsHandler = new MetricsHandler();
|
||||
try {
|
||||
serverBootstrap.group(boss, worker);
|
||||
// 参数调优1 - 必调
|
||||
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
|
||||
// 参数调优2 - 必调
|
||||
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
|
||||
|
||||
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(NioSocketChannel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
// 调试的时候使用的
|
||||
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
|
||||
// fixme 加入空闲检测
|
||||
pipeline.addLast("ServerIdleCheckHandler", new ServerIdleCheckHandler());
|
||||
// Handler - 规范一定要指定线程名称
|
||||
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
|
||||
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
|
||||
|
||||
pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
|
||||
pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
|
||||
// 设置 日志级别
|
||||
pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
|
||||
|
||||
pipeline.addLast("OrderServerProcessHandler", new OrderServerProcessHandler());
|
||||
// 连接数监测 Handler
|
||||
pipeline.addLast("MetricsHandler", metricsHandler);
|
||||
}
|
||||
});
|
||||
// 绑定端口启动
|
||||
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
|
||||
|
||||
channelFuture.channel().closeFuture().sync();
|
||||
} finally {
|
||||
boss.shutdownGracefully();
|
||||
worker.shutdownGracefully();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,90 @@
|
||||
package com.baiye.case5.server;
|
||||
|
||||
import com.baiye.case5.server.codec.OrderFrameDecoder;
|
||||
import com.baiye.case5.server.codec.OrderFrameEncoder;
|
||||
import com.baiye.case5.server.codec.OrderProtocolDecoder;
|
||||
import com.baiye.case5.server.codec.OrderProtocolEncoder;
|
||||
import com.baiye.case5.server.handler.MetricsHandler;
|
||||
import com.baiye.case5.server.handler.OrderServerProcessHandler;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioChannelOption;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.ipfilter.IpFilterRuleType;
|
||||
import io.netty.handler.ipfilter.IpSubnetFilterRule;
|
||||
import io.netty.handler.ipfilter.RuleBasedIpFilter;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
* 服务端 - 安全增强 - 黑白名单
|
||||
*
|
||||
* @author q
|
||||
*/
|
||||
public class ServerV7 {
|
||||
|
||||
public static void main(String[] args) throws InterruptedException, ExecutionException {
|
||||
|
||||
ServerBootstrap serverBootstrap = new ServerBootstrap();
|
||||
// 设置 I/O 模式
|
||||
serverBootstrap.channel(NioServerSocketChannel.class);
|
||||
// 设置 日志级别
|
||||
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
|
||||
// 设置 Reactor 方式 - 规范需要指定线程名称
|
||||
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
|
||||
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
|
||||
|
||||
// 共享的 Handler
|
||||
MetricsHandler metricsHandler = new MetricsHandler();
|
||||
// fixme 加入黑白名单 REJECT 标识为黑名单 | 切换 IpFilterRuleType 可以变成白名单
|
||||
IpSubnetFilterRule ipSubnetFilterRule = new IpSubnetFilterRule("127.0.0.1", 8, IpFilterRuleType.ACCEPT);
|
||||
RuleBasedIpFilter ruleBasedIpFilter = new RuleBasedIpFilter(ipSubnetFilterRule);
|
||||
try {
|
||||
serverBootstrap.group(boss, worker);
|
||||
// 参数调优1 - 必调
|
||||
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
|
||||
// 参数调优2 - 必调
|
||||
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
|
||||
|
||||
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(NioSocketChannel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
// 调试的时候使用的
|
||||
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
|
||||
|
||||
pipeline.addLast("IpFileter", ruleBasedIpFilter);
|
||||
// Handler - 规范一定要指定线程名称
|
||||
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
|
||||
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
|
||||
|
||||
pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
|
||||
pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
|
||||
// 设置 日志级别
|
||||
pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
|
||||
|
||||
pipeline.addLast("OrderServerProcessHandler", new OrderServerProcessHandler());
|
||||
// 连接数监测 Handler
|
||||
pipeline.addLast("MetricsHandler", metricsHandler);
|
||||
}
|
||||
});
|
||||
// 绑定端口启动
|
||||
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
|
||||
|
||||
channelFuture.channel().closeFuture().sync();
|
||||
} finally {
|
||||
boss.shutdownGracefully();
|
||||
worker.shutdownGracefully();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
package com.baiye.case5.server.handler;
|
||||
|
||||
import com.baiye.case5.common.Operation;
|
||||
import com.baiye.case5.common.RequestMessage;
|
||||
import com.baiye.case5.common.auth.AuthOperation;
|
||||
import com.baiye.case5.common.auth.AuthOperationResult;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@ChannelHandler.Sharable
|
||||
public class AuthHandler extends SimpleChannelInboundHandler<RequestMessage> {
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, RequestMessage msg) throws Exception {
|
||||
try {
|
||||
Operation operation = msg.getMessageBody();
|
||||
if (operation instanceof AuthOperation) {
|
||||
AuthOperation authOperation = (AuthOperation) operation;
|
||||
AuthOperationResult authOperationResult = authOperation.execute();
|
||||
if (authOperationResult.isPassAuth()) {
|
||||
log.info("pass auth");
|
||||
} else {
|
||||
log.error("fail to auth");
|
||||
ctx.close();
|
||||
}
|
||||
} else {
|
||||
// 第一个请求必须是做授权的
|
||||
log.error("expect first msg is auth");
|
||||
ctx.close();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 异常发生, 直接关闭好了
|
||||
log.error("exception happen for: " + e.getMessage(), e);
|
||||
ctx.close();
|
||||
} finally {
|
||||
// 授权完成后从 pipeline 进行移除
|
||||
ctx.pipeline().remove(this);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
package com.baiye.case5.server.handler;
|
||||
|
||||
import com.codahale.metrics.ConsoleReporter;
|
||||
import com.codahale.metrics.Gauge;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.jmx.JmxReporter;
|
||||
import io.netty.channel.ChannelDuplexHandler;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
|
||||
/**
|
||||
* ChannelDuplexHandler 集成既支持输入又支持输出的 Handler
|
||||
*
|
||||
* @ChannelHandler.Sharable 标识为可共享的 handler 且保护我们的应用程序
|
||||
*/
|
||||
@ChannelHandler.Sharable
|
||||
public class MetricsHandler extends ChannelDuplexHandler {
|
||||
|
||||
/**
|
||||
* 记录当前的连接数
|
||||
*/
|
||||
private AtomicLong totalConnectionNumber = new AtomicLong();
|
||||
|
||||
{
|
||||
// jar - 入口
|
||||
MetricRegistry metricRegistry = new MetricRegistry();
|
||||
// 把 totalConnectionNumber register 进去
|
||||
metricRegistry.register("totalConnectionNumber", (Gauge<Long>) () -> totalConnectionNumber.longValue());
|
||||
|
||||
// Console 的方式进行展示数据
|
||||
ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry).build();
|
||||
// 以 10秒 的时间周期进行打印
|
||||
consoleReporter.start(10, TimeUnit.SECONDS);
|
||||
|
||||
// JMX 的方式进行展示
|
||||
JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
|
||||
jmxReporter.start();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接的创建 + 1
|
||||
*/
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
totalConnectionNumber.incrementAndGet();
|
||||
super.channelActive(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接的释放 - 1
|
||||
*/
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
totalConnectionNumber.decrementAndGet();
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
package com.baiye.case5.server.handler;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
public class ServerIdleCheckHandler extends IdleStateHandler {
|
||||
|
||||
/**
|
||||
* 10s 发送一个心跳检测
|
||||
*/
|
||||
public ServerIdleCheckHandler() {
|
||||
super(10, 0, 0, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接检测
|
||||
*
|
||||
* @param ctx
|
||||
* @param evt
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
|
||||
// 断了连接
|
||||
if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) {
|
||||
log.info("idle check happen, so close the connection");
|
||||
ctx.close();
|
||||
return;
|
||||
}
|
||||
|
||||
super.channelIdle(ctx, evt);
|
||||
}
|
||||
}
|
After Width: | Height: | Size: 253 KiB |
After Width: | Height: | Size: 104 KiB |
After Width: | Height: | Size: 481 KiB |
After Width: | Height: | Size: 187 KiB |
After Width: | Height: | Size: 576 KiB |
After Width: | Height: | Size: 186 KiB |
After Width: | Height: | Size: 306 KiB |
After Width: | Height: | Size: 340 KiB |
After Width: | Height: | Size: 211 KiB |
After Width: | Height: | Size: 135 KiB |
After Width: | Height: | Size: 151 KiB |
After Width: | Height: | Size: 194 KiB |
After Width: | Height: | Size: 233 KiB |
After Width: | Height: | Size: 566 KiB |
After Width: | Height: | Size: 331 KiB |
After Width: | Height: | Size: 223 KiB |
After Width: | Height: | Size: 320 KiB |
After Width: | Height: | Size: 426 KiB |
After Width: | Height: | Size: 411 KiB |
After Width: | Height: | Size: 1.0 MiB |
After Width: | Height: | Size: 523 KiB |
After Width: | Height: | Size: 260 KiB |
After Width: | Height: | Size: 508 KiB |
After Width: | Height: | Size: 750 KiB |
After Width: | Height: | Size: 588 KiB |
After Width: | Height: | Size: 244 KiB |
After Width: | Height: | Size: 692 KiB |
After Width: | Height: | Size: 754 KiB |
After Width: | Height: | Size: 284 KiB |
After Width: | Height: | Size: 196 KiB |
After Width: | Height: | Size: 242 KiB |
After Width: | Height: | Size: 134 KiB |
After Width: | Height: | Size: 444 KiB |
After Width: | Height: | Size: 446 KiB |
After Width: | Height: | Size: 397 KiB |
After Width: | Height: | Size: 379 KiB |
After Width: | Height: | Size: 412 KiB |