修改netty与disruptor方式

master
bynt 2 years ago
parent be42f5ed09
commit 1482fa9e42

@ -8,7 +8,7 @@ spring:
database: 9 database: 9
host: 8.130.96.163 host: 8.130.96.163
port: 6379 port: 6379
password: password:
datasource: datasource:
url: jdbc:mysql://${MYSQL_HOST:8.130.96.163}:${MYSQL_PORT:3306}/${MYSQL_DB:gateway}?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true url: jdbc:mysql://${MYSQL_HOST:8.130.96.163}:${MYSQL_PORT:3306}/${MYSQL_DB:gateway}?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true
driver-class-name: com.mysql.jdbc.Driver driver-class-name: com.mysql.jdbc.Driver

@ -16,6 +16,12 @@
<java.version>8</java.version> <java.version>8</java.version>
</properties> </properties>
<modules>
<module>dy-tool-livetelecast</module>
<module>dy-tool-member</module>
<module>dy-tool-video</module>
</modules>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>

@ -22,6 +22,7 @@
<artifactId>cdp-common-rocketmq</artifactId> <artifactId>cdp-common-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>

@ -33,6 +33,9 @@ public class MessageRecord implements Serializable {
@ApiModelProperty(value = "mq消息") @ApiModelProperty(value = "mq消息")
private String msg; private String msg;
@ApiModelProperty(value = "消息状态")
private Integer status;
@ApiModelProperty(value = "消息类型") @ApiModelProperty(value = "消息类型")
private Integer msgType; private Integer msgType;

@ -54,7 +54,7 @@ public class WechatMessageServiceImpl implements WechatMessageService {
*/ */
@Override @Override
public void analyticalData(WeChatResponse weChatResponse) { public void analyticalData(WeChatResponse weChatResponse) {
log.info("====接收到微信消息回调:{}=====",weChatResponse); log.info("====接收到微信消息回调:{}=====", weChatResponse);
Integer status = weChatResponse.getStatus(); Integer status = weChatResponse.getStatus();
switch (status) { switch (status) {
// 登录回调请求 // 登录回调请求
@ -69,6 +69,7 @@ public class WechatMessageServiceImpl implements WechatMessageService {
case WechatCallbackStatus.PARTIAL: case WechatCallbackStatus.PARTIAL:
wechatMessage(weChatResponse); wechatMessage(weChatResponse);
break; break;
default:
} }
} }
@ -86,7 +87,9 @@ public class WechatMessageServiceImpl implements WechatMessageService {
weChatResponse.setSenderId(wechatMessageContent.getWechat()); weChatResponse.setSenderId(wechatMessageContent.getWechat());
weChatResponse.setMessageRecordId(messageRecordId); weChatResponse.setMessageRecordId(messageRecordId);
// 持久化消息 // 持久化消息
MessageRecord messageRecord = new MessageRecord(); MessageRecord messageRecord = new MessageRecord();
messageRecord.setMsg(JSONUtil.toJsonStr(weChatResponse)); messageRecord.setMsg(JSONUtil.toJsonStr(weChatResponse));
messageRecord.setTag(MqConstant.WECHAT_TAG); messageRecord.setTag(MqConstant.WECHAT_TAG);
messageRecord.setId(messageRecordId); messageRecord.setId(messageRecordId);
@ -103,7 +106,7 @@ public class WechatMessageServiceImpl implements WechatMessageService {
.build(); .build();
// 发送消息 // 发送消息
chatMessageSource.wechatOutPut().send(message); chatMessageSource.wechatOutPut().send(message);
}else{ } else {
throw new BadRequestException("============微信聊天消息回调插入失败:=============" + weChatResponse); throw new BadRequestException("============微信聊天消息回调插入失败:=============" + weChatResponse);
} }
} }
@ -111,10 +114,10 @@ public class WechatMessageServiceImpl implements WechatMessageService {
/** /**
* *
*/ */
public void wechatLogin(WeChatResponse weChatResponse){ public void wechatLogin(WeChatResponse weChatResponse) {
Result<String> result = wechatClient.frontRcvResponse(weChatResponse); Result<String> result = wechatClient.frontRcvResponse(weChatResponse);
if (result.getCode() != HttpStatus.SUCCESS){ if (result.getCode() != HttpStatus.SUCCESS) {
log.info("========调用微信登录处理请求失败:{},{}==========",result, DateUtil.now()); log.info("========调用微信登录处理请求失败:{},{}==========", result, DateUtil.now());
} }
} }
} }

@ -8,6 +8,7 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
* @author Enzo * @author Enzo
* @date 2022-7-20 * @date 2022-7-20
*/ */
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class CdpToolNettyDisruptorApplication { public class CdpToolNettyDisruptorApplication {

@ -35,6 +35,7 @@ public class NettyDisruptorRunner implements ApplicationRunner {
} }
RingBufferWorkerPoolFactory.getInstance().initStart RingBufferWorkerPoolFactory.getInstance().initStart
(ProducerType.MULTI, 1024 * 1024, new YieldingWaitStrategy(), consumers); (ProducerType.MULTI, 1024 * 1024, new YieldingWaitStrategy(), consumers);
socketServer.startServer(); socketServer.startServer();
} }
} }

@ -30,19 +30,19 @@ public class RingBufferWorkerPoolFactory {
private SequenceBarrier sequenceBarrier; private SequenceBarrier sequenceBarrier;
public void initStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] consumersList){ public void initStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] consumersList){
//构造 ringbuffer //构造 ring buffer
this.ringBuffer = RingBuffer.create(type, TranslatorDataWraper::new, bufferSize, waitStrategy); this.ringBuffer = RingBuffer.create(type, TranslatorDataWraper::new, bufferSize, waitStrategy);
//设置栅栏 //设置栅栏
this.sequenceBarrier = this.ringBuffer.newBarrier(); this.sequenceBarrier = this.ringBuffer.newBarrier();
//设置workpool //设置work pool
this.workerPool = new WorkerPool<>(this.ringBuffer, this.sequenceBarrier, null, consumersList); this.workerPool = new WorkerPool<>(this.ringBuffer, this.sequenceBarrier, null, consumersList);
//把所构建的消费者放入池中 //把所构建的消费者放入池中
Arrays.asList(consumersList).forEach(messageConsumer -> consumers.put(messageConsumer.getConsumerId(), messageConsumer)); Arrays.asList(consumersList).forEach(messageConsumer -> consumers.put(messageConsumer.getConsumerId(), messageConsumer));
//添加sequence //添加sequence
this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences()); this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences());
//启动workpool //启动work pool
this.workerPool.start(Executors.newFixedThreadPool(10)); this.workerPool.start(Executors.newFixedThreadPool(10));
} }

@ -3,6 +3,10 @@ package com.baiye.common;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import lombok.Data; import lombok.Data;
/**
* @author q
* @date
*/
@Data @Data
public class TranslatorDataWraper { public class TranslatorDataWraper {
private String msg; private String msg;

@ -5,6 +5,10 @@ import com.baiye.common.TranslatorDataWraper;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/**
* @author
* @date
*/
@Slf4j @Slf4j
public class MessageConsumerImpl extends MessageConsumer { public class MessageConsumerImpl extends MessageConsumer {
public MessageConsumerImpl(String consumerId) { public MessageConsumerImpl(String consumerId) {

@ -1,17 +1,15 @@
package com.baiye.server; package com.baiye.server;
import com.baiye.codec.MarshallingCodeCFactory;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*; import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; import io.netty.handler.logging.LoggingHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -84,8 +82,4 @@ public class SocketServer {
log.info("---------socket服务启动成功---------"); log.info("---------socket服务启动成功---------");
} }
public static void main(String[] args) throws InterruptedException {
SocketServer.getInstance().startServer();
}
} }

@ -1,14 +1,12 @@
package com.baiye.server; package com.baiye.server;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.baiye.request.MessageRequest; import com.baiye.request.MessageRequest;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*; import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -22,7 +20,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive; import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
/** /**
* @author Enzo * @author Enzo
@ -62,12 +62,12 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
* @throws Exception * @throws Exception
*/ */
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) { protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpRequest){ if (msg instanceof FullHttpRequest) {
//以http请求形式接入但是走的是websocket //以http请求形式接入但是走的是websocket
handleHttpRequest(ctx, (FullHttpRequest) msg); handleHttpRequest(ctx, (FullHttpRequest) msg);
}else if (msg instanceof WebSocketFrame){ } else if (msg instanceof WebSocketFrame) {
//处理websocket客户端的消息 //处理websocket客户端的消息
handlerWebSocketFrame(ctx, (WebSocketFrame) msg); handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
} }
@ -101,7 +101,7 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
* @throws Exception * @throws Exception
*/ */
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) { public void handlerAdded(ChannelHandlerContext ctx) {
log.info("一个客户端连接......" + ctx.channel().remoteAddress() + Thread.currentThread().getName()); log.info("一个客户端连接......" + ctx.channel().remoteAddress() + Thread.currentThread().getName());
} }
@ -154,31 +154,39 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
/** /**
* httpwebsocket * httpwebsocket
* */ */
private void handleHttpRequest(ChannelHandlerContext ctx, private void handleHttpRequest(ChannelHandlerContext ctx,
FullHttpRequest req) { FullHttpRequest req) {
//要求Upgrade为websocket过滤掉get/Post // Handle a bad request.
if (!req.decoderResult().isSuccess() if (!req.decoderResult().isSuccess()) {
|| (!"websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST, Unpooled.EMPTY_BUFFER));
//若不是websocket方式则创建BAD_REQUEST的req返回给客户端
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return; return;
} }
// Allow only GET methods.
if (!HttpMethod.GET.equals(req.method())) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, Unpooled.EMPTY_BUFFER));
return;
}
// Handshake
//注意这条地址别被误导了其实这里填写什么都无所谓WS协议消息的接收不受这
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
"ws://localhost:8099/websocket", null, false); getWebSocketLocation(req), null, true, 5 * 1024 * 1024);
handShaker = wsFactory.newHandshaker(req); handShaker = wsFactory.newHandshaker(req);
if (handShaker == null) { if (handShaker == null) {
WebSocketServerHandshakerFactory WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
.sendUnsupportedVersionResponse(ctx.channel());
} else { } else {
// 通过它构造握手响应消息返回给客户端,
// 同时将WebSocket相关的编码和解码类动态添加到ChannelPipeline中用于WebSocket消息的编解码
// 添加WebSocketEncoder和WebSocketDecoder之后服务端就可以自动对WebSocket消息进行编解码了
handShaker.handshake(ctx.channel(), req); handShaker.handshake(ctx.channel(), req);
} }
} }
/** /**
* *
* */ */
private static void sendHttpResponse(ChannelHandlerContext ctx, private static void sendHttpResponse(ChannelHandlerContext ctx,
FullHttpRequest req, DefaultFullHttpResponse res) { FullHttpRequest req, DefaultFullHttpResponse res) {
// 返回应答给客户端 // 返回应答给客户端
@ -196,7 +204,7 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
} }
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){ private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 判断是否关闭链路的指令 // 判断是否关闭链路的指令
if (frame instanceof CloseWebSocketFrame) { if (frame instanceof CloseWebSocketFrame) {
handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
@ -219,10 +227,10 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
log.debug("服务端收到:" + request); log.debug("服务端收到:" + request);
TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
+ ctx.channel().id() + "" + request); + ctx.channel().id() + "" + request);
// channelHandlerContext.channel().writeAndFlush(tws);
// 返回【谁发的发给谁】 ctx.channel().writeAndFlush(tws);
ctx.channel().writeAndFlush(tws);
} }
public static Map<String, Channel> getChannelMap() { public static Map<String, Channel> getChannelMap() {
return CHANNEL_MAP; return CHANNEL_MAP;
} }
@ -234,4 +242,9 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
public static Map<String, Long> getClientMap() { public static Map<String, Long> getClientMap() {
return CLIENT_MAP; return CLIENT_MAP;
} }
private static String getWebSocketLocation(FullHttpRequest req) {
String location = req.headers().get(HttpHeaderNames.HOST) + req.uri();
return "ws://" + location;
}
} }

@ -7,6 +7,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -33,29 +34,22 @@ public class WebsocketChannelInitializer extends ChannelInitializer<SocketChanne
ChannelPipeline pipeline = socketChannel.pipeline(); ChannelPipeline pipeline = socketChannel.pipeline();
//因为基于http协议使用http的编码和解码器 //因为基于http协议使用http的编码和解码器
pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpServerCodec());
// http 消息聚合器 512*1024为接收的最大content-length
pipeline.addLast(new HttpObjectAggregator(8192));
//是以块方式写添加ChunkedWriteHandler处理器 //是以块方式写添加ChunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new ChunkedWriteHandler());
/*
// 进行设置编解码 -> 对Java对象转为的二进制数据进行编解码
pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
/*
1. http, HttpObjectAggregator
2. http
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/*
1. websocket (frame) 1. websocket (frame)
2. WebSocketFrame 2. WebSocketFrame
3. ws://localhost:7000/msg 表示请求的uri 3. ws://localhost:7000/msg 表示请求的uri
4. WebSocketServerProtocolHandler http ws , 4. WebSocketServerProtocolHandler http ws ,
5. 101 5. 101
*/ */
pipeline.addLast(new WebSocketServerProtocolHandler(websocketUrl)); pipeline.addLast(new WebSocketServerProtocolHandler(websocketUrl));
//自定义的handler ,处理业务逻辑
pipeline.addLast(socketServerHandler);
} }
} }

@ -1,13 +0,0 @@
package com.baiye;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class CdpToolNettyDisruptorApplicationTests {
@Test
void contextLoads() {
}
}

@ -2,6 +2,7 @@ package com.baiye.job;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpStatus; import cn.hutool.http.HttpStatus;
import cn.hutool.http.HttpUtil; import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
@ -17,13 +18,10 @@ import com.baiye.entity.Source;
import com.baiye.entity.WeChatSource; import com.baiye.entity.WeChatSource;
import com.baiye.entity.WeChatUser; import com.baiye.entity.WeChatUser;
import com.baiye.properties.WeChatProperties; import com.baiye.properties.WeChatProperties;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations; import org.springframework.data.redis.core.ValueOperations;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.HashMap;

@ -39,6 +39,7 @@ public class WeChatCallbackLog {
@ApiModelProperty(value = "标识") @ApiModelProperty(value = "标识")
private String cusQrUuId; private String cusQrUuId;
@Column(name = "create_time") @Column(name = "create_time")
@ApiModelProperty(value = "创建时间") @ApiModelProperty(value = "创建时间")
private Date createTime; private Date createTime;

@ -20,6 +20,7 @@
<module>cdp-entrance</module> <module>cdp-entrance</module>
<module>cdp-tag-center</module> <module>cdp-tag-center</module>
<module>cdp-customer</module> <module>cdp-customer</module>
</modules> </modules>
@ -179,8 +180,9 @@
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-all</artifactId> <artifactId>netty-all</artifactId>
<version>4.1.6.Final</version> <version>4.1.49.Final</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.lmax</groupId> <groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId> <artifactId>disruptor</artifactId>

Loading…
Cancel
Save