diff --git a/cdp-iaas/gateway-server/src/main/resources/config/application.yml b/cdp-iaas/gateway-server/src/main/resources/config/application.yml index 1013ce1..36f0ad3 100644 --- a/cdp-iaas/gateway-server/src/main/resources/config/application.yml +++ b/cdp-iaas/gateway-server/src/main/resources/config/application.yml @@ -8,7 +8,7 @@ spring: database: 9 host: 8.130.96.163 port: 6379 - password: + password: datasource: url: jdbc:mysql://${MYSQL_HOST:8.130.96.163}:${MYSQL_PORT:3306}/${MYSQL_DB:gateway}?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true driver-class-name: com.mysql.jdbc.Driver diff --git a/cdp-tools/cdp-tool-dy/pom.xml b/cdp-tools/cdp-tool-dy/pom.xml index ccb3457..1ea6d47 100644 --- a/cdp-tools/cdp-tool-dy/pom.xml +++ b/cdp-tools/cdp-tool-dy/pom.xml @@ -16,6 +16,12 @@ 8 + + dy-tool-livetelecast + dy-tool-member + dy-tool-video + + diff --git a/cdp-tools/cdp-tool-mq/cdp-tool-mq-producer/pom.xml b/cdp-tools/cdp-tool-mq/cdp-tool-mq-producer/pom.xml index 6dc38a0..e18bc9a 100644 --- a/cdp-tools/cdp-tool-mq/cdp-tool-mq-producer/pom.xml +++ b/cdp-tools/cdp-tool-mq/cdp-tool-mq-producer/pom.xml @@ -22,6 +22,7 @@ cdp-common-rocketmq 1.0.0-SNAPSHOT + org.springframework.boot spring-boot-starter-test diff --git a/cdp-tools/cdp-tool-mq/cdp-tool-mq-producer/src/main/java/com/baiye/module/entity/MessageRecord.java b/cdp-tools/cdp-tool-mq/cdp-tool-mq-producer/src/main/java/com/baiye/module/entity/MessageRecord.java index faa4ba0..a7bb12f 100644 --- a/cdp-tools/cdp-tool-mq/cdp-tool-mq-producer/src/main/java/com/baiye/module/entity/MessageRecord.java +++ b/cdp-tools/cdp-tool-mq/cdp-tool-mq-producer/src/main/java/com/baiye/module/entity/MessageRecord.java @@ -33,6 +33,9 @@ public class MessageRecord implements Serializable { @ApiModelProperty(value = "mq消息") private String msg; + @ApiModelProperty(value = "消息状态") + private Integer status; + @ApiModelProperty(value = "消息类型") private Integer msgType; diff --git a/cdp-tools/cdp-tool-mq/cdp-tool-mq-producer/src/main/java/com/baiye/module/service/impl/WechatMessageServiceImpl.java b/cdp-tools/cdp-tool-mq/cdp-tool-mq-producer/src/main/java/com/baiye/module/service/impl/WechatMessageServiceImpl.java index 721f071..4d5fac1 100644 --- a/cdp-tools/cdp-tool-mq/cdp-tool-mq-producer/src/main/java/com/baiye/module/service/impl/WechatMessageServiceImpl.java +++ b/cdp-tools/cdp-tool-mq/cdp-tool-mq-producer/src/main/java/com/baiye/module/service/impl/WechatMessageServiceImpl.java @@ -54,7 +54,7 @@ public class WechatMessageServiceImpl implements WechatMessageService { */ @Override public void analyticalData(WeChatResponse weChatResponse) { - log.info("====接收到微信消息回调:{}=====",weChatResponse); + log.info("====接收到微信消息回调:{}=====", weChatResponse); Integer status = weChatResponse.getStatus(); switch (status) { // 登录回调请求 @@ -69,6 +69,7 @@ public class WechatMessageServiceImpl implements WechatMessageService { case WechatCallbackStatus.PARTIAL: wechatMessage(weChatResponse); break; + default: } } @@ -86,7 +87,9 @@ public class WechatMessageServiceImpl implements WechatMessageService { weChatResponse.setSenderId(wechatMessageContent.getWechat()); weChatResponse.setMessageRecordId(messageRecordId); // 持久化消息 + MessageRecord messageRecord = new MessageRecord(); + messageRecord.setMsg(JSONUtil.toJsonStr(weChatResponse)); messageRecord.setTag(MqConstant.WECHAT_TAG); messageRecord.setId(messageRecordId); @@ -103,7 +106,7 @@ public class WechatMessageServiceImpl implements WechatMessageService { .build(); // 发送消息 chatMessageSource.wechatOutPut().send(message); - }else{ + } else { throw new BadRequestException("============微信聊天消息回调插入失败:=============" + weChatResponse); } } @@ -111,10 +114,10 @@ public class WechatMessageServiceImpl implements WechatMessageService { /** * 微信登录回调处理 */ - public void wechatLogin(WeChatResponse weChatResponse){ + public void wechatLogin(WeChatResponse weChatResponse) { Result result = wechatClient.frontRcvResponse(weChatResponse); - if (result.getCode() != HttpStatus.SUCCESS){ - log.info("========调用微信登录处理请求失败:{},{}==========",result, DateUtil.now()); + if (result.getCode() != HttpStatus.SUCCESS) { + log.info("========调用微信登录处理请求失败:{},{}==========", result, DateUtil.now()); } } } diff --git a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/CdpToolNettyDisruptorApplication.java b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/CdpToolNettyDisruptorApplication.java index 53ee81e..2b3cf90 100644 --- a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/CdpToolNettyDisruptorApplication.java +++ b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/CdpToolNettyDisruptorApplication.java @@ -8,6 +8,7 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; * @author Enzo * @date 2022-7-20 */ + @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) public class CdpToolNettyDisruptorApplication { diff --git a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/app/NettyDisruptorRunner.java b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/app/NettyDisruptorRunner.java index cd48824..937a53c 100644 --- a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/app/NettyDisruptorRunner.java +++ b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/app/NettyDisruptorRunner.java @@ -35,6 +35,7 @@ public class NettyDisruptorRunner implements ApplicationRunner { } RingBufferWorkerPoolFactory.getInstance().initStart (ProducerType.MULTI, 1024 * 1024, new YieldingWaitStrategy(), consumers); + socketServer.startServer(); } } diff --git a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/common/RingBufferWorkerPoolFactory.java b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/common/RingBufferWorkerPoolFactory.java index 231a49e..388f69d 100644 --- a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/common/RingBufferWorkerPoolFactory.java +++ b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/common/RingBufferWorkerPoolFactory.java @@ -30,19 +30,19 @@ public class RingBufferWorkerPoolFactory { private SequenceBarrier sequenceBarrier; public void initStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] consumersList){ - //构造 ringbuffer + //构造 ring buffer this.ringBuffer = RingBuffer.create(type, TranslatorDataWraper::new, bufferSize, waitStrategy); //设置栅栏 this.sequenceBarrier = this.ringBuffer.newBarrier(); - //设置workpool + //设置work pool this.workerPool = new WorkerPool<>(this.ringBuffer, this.sequenceBarrier, null, consumersList); //把所构建的消费者放入池中 Arrays.asList(consumersList).forEach(messageConsumer -> consumers.put(messageConsumer.getConsumerId(), messageConsumer)); //添加sequence this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences()); - //启动workpool + //启动work pool this.workerPool.start(Executors.newFixedThreadPool(10)); } diff --git a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/common/TranslatorDataWapper.java b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/common/TranslatorDataWapper.java index 5ba4acd..3b33970 100644 --- a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/common/TranslatorDataWapper.java +++ b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/common/TranslatorDataWapper.java @@ -19,5 +19,4 @@ public class TranslatorDataWapper { - } diff --git a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/common/TranslatorDataWraper.java b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/common/TranslatorDataWraper.java index 9c23ee6..e68608f 100644 --- a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/common/TranslatorDataWraper.java +++ b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/common/TranslatorDataWraper.java @@ -3,6 +3,10 @@ package com.baiye.common; import io.netty.channel.ChannelHandlerContext; import lombok.Data; +/** + * @author q + * @date + */ @Data public class TranslatorDataWraper { private String msg; diff --git a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/MessageConsumerImpl.java b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/MessageConsumerImpl.java index d5e123e..c1b66c7 100644 --- a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/MessageConsumerImpl.java +++ b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/MessageConsumerImpl.java @@ -5,6 +5,10 @@ import com.baiye.common.TranslatorDataWraper; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; +/** + * @author + * @date + */ @Slf4j public class MessageConsumerImpl extends MessageConsumer { public MessageConsumerImpl(String consumerId) { diff --git a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/SocketServer.java b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/SocketServer.java index 991b77c..1897ceb 100644 --- a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/SocketServer.java +++ b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/SocketServer.java @@ -1,17 +1,15 @@ package com.baiye.server; -import com.baiye.codec.MarshallingCodeCFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.*; +import io.netty.channel.AdaptiveRecvByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -84,8 +82,4 @@ public class SocketServer { log.info("---------socket服务启动成功---------"); } - - public static void main(String[] args) throws InterruptedException { - SocketServer.getInstance().startServer(); - } } diff --git a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/SocketServerHandler.java b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/SocketServerHandler.java index e296287..4a903ef 100644 --- a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/SocketServerHandler.java +++ b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/SocketServerHandler.java @@ -1,14 +1,12 @@ package com.baiye.server; +import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.baiye.request.MessageRequest; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; @@ -22,7 +20,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import static io.netty.handler.codec.http.HttpResponseStatus.*; import static io.netty.handler.codec.http.HttpUtil.isKeepAlive; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; /** * @author Enzo @@ -62,12 +62,12 @@ public class SocketServerHandler extends SimpleChannelInboundHandler { * @throws Exception */ @Override - protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { - if (msg instanceof FullHttpRequest){ + if (msg instanceof FullHttpRequest) { //以http请求形式接入,但是走的是websocket handleHttpRequest(ctx, (FullHttpRequest) msg); - }else if (msg instanceof WebSocketFrame){ + } else if (msg instanceof WebSocketFrame) { //处理websocket客户端的消息 handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } @@ -101,7 +101,7 @@ public class SocketServerHandler extends SimpleChannelInboundHandler { * @throws Exception */ @Override - public void handlerAdded(ChannelHandlerContext ctx) { + public void handlerAdded(ChannelHandlerContext ctx) { log.info("一个客户端连接......" + ctx.channel().remoteAddress() + Thread.currentThread().getName()); } @@ -154,31 +154,39 @@ public class SocketServerHandler extends SimpleChannelInboundHandler { /** * 唯一的一次http请求,用于创建websocket - * */ + */ private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { - //要求Upgrade为websocket,过滤掉get/Post - if (!req.decoderResult().isSuccess() - || (!"websocket".equals(req.headers().get("Upgrade")))) { - //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端 - sendHttpResponse(ctx, req, new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); + // Handle a bad request. + if (!req.decoderResult().isSuccess()) { + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST, Unpooled.EMPTY_BUFFER)); return; } + + // Allow only GET methods. + if (!HttpMethod.GET.equals(req.method())) { + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, Unpooled.EMPTY_BUFFER)); + return; + } + + // Handshake + //注意,这条地址别被误导了,其实这里填写什么都无所谓,WS协议消息的接收不受这 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( - "ws://localhost:8099/websocket", null, false); + getWebSocketLocation(req), null, true, 5 * 1024 * 1024); handShaker = wsFactory.newHandshaker(req); if (handShaker == null) { - WebSocketServerHandshakerFactory - .sendUnsupportedVersionResponse(ctx.channel()); + WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { + // 通过它构造握手响应消息返回给客户端, + // 同时将WebSocket相关的编码和解码类动态添加到ChannelPipeline中,用于WebSocket消息的编解码, + // 添加WebSocketEncoder和WebSocketDecoder之后,服务端就可以自动对WebSocket消息进行编解码了 handShaker.handshake(ctx.channel(), req); } } /** * 拒绝不合法的请求,并返回错误信息 - * */ + */ private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回应答给客户端 @@ -196,7 +204,7 @@ public class SocketServerHandler extends SimpleChannelInboundHandler { } - private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){ + private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); @@ -219,10 +227,10 @@ public class SocketServerHandler extends SimpleChannelInboundHandler { log.debug("服务端收到:" + request); TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request); - // channelHandlerContext.channel().writeAndFlush(tws); - // 返回【谁发的发给谁】 - ctx.channel().writeAndFlush(tws); + + ctx.channel().writeAndFlush(tws); } + public static Map getChannelMap() { return CHANNEL_MAP; } @@ -234,4 +242,9 @@ public class SocketServerHandler extends SimpleChannelInboundHandler { public static Map getClientMap() { return CLIENT_MAP; } + + private static String getWebSocketLocation(FullHttpRequest req) { + String location = req.headers().get(HttpHeaderNames.HOST) + req.uri(); + return "ws://" + location; + } } diff --git a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/WebsocketChannelInitializer.java b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/WebsocketChannelInitializer.java index d4cd934..193120d 100644 --- a/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/WebsocketChannelInitializer.java +++ b/cdp-tools/cdp-tool-netty-disruptor/src/main/java/com/baiye/server/WebsocketChannelInitializer.java @@ -7,6 +7,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; @@ -33,29 +34,22 @@ public class WebsocketChannelInitializer extends ChannelInitializer 对Java对象转为的二进制数据进行编解码 - pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); - pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); - - /* - 说明 - 1. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合 - 2. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求 - */ - pipeline.addLast(new HttpObjectAggregator(8192)); - /* 说明 + /* 说明 1. 对应websocket ,它的数据是以 帧(frame) 形式传递 2. 可以看到WebSocketFrame 下面有六个子类 3. 浏览器请求时 ws://localhost:7000/msg 表示请求的uri 4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接 5. 是通过一个 状态码 101 */ + pipeline.addLast(new WebSocketServerProtocolHandler(websocketUrl)); - //自定义的handler ,处理业务逻辑 - pipeline.addLast(socketServerHandler); + + } } diff --git a/cdp-tools/cdp-tool-netty-disruptor/src/test/java/com/baiye/CdpToolNettyDisruptorApplicationTests.java b/cdp-tools/cdp-tool-netty-disruptor/src/test/java/com/baiye/CdpToolNettyDisruptorApplicationTests.java deleted file mode 100644 index 4390f5d..0000000 --- a/cdp-tools/cdp-tool-netty-disruptor/src/test/java/com/baiye/CdpToolNettyDisruptorApplicationTests.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.baiye; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -@SpringBootTest -class CdpToolNettyDisruptorApplicationTests { - - @Test - void contextLoads() { - } - -} diff --git a/cdp-tools/cdp-tool-wechat/cdp-wechat-api/src/main/java/com/baiye/job/AddFriendJob.java b/cdp-tools/cdp-tool-wechat/cdp-wechat-api/src/main/java/com/baiye/job/AddFriendJob.java index 708f9cf..fbe1aba 100644 --- a/cdp-tools/cdp-tool-wechat/cdp-wechat-api/src/main/java/com/baiye/job/AddFriendJob.java +++ b/cdp-tools/cdp-tool-wechat/cdp-wechat-api/src/main/java/com/baiye/job/AddFriendJob.java @@ -2,6 +2,7 @@ package com.baiye.job; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpStatus; import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONUtil; @@ -17,13 +18,10 @@ import com.baiye.entity.Source; import com.baiye.entity.WeChatSource; import com.baiye.entity.WeChatUser; import com.baiye.properties.WeChatProperties; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.HashMap; diff --git a/cdp-tools/cdp-tool-wechat/cdp-wechat-server/src/main/java/com/baiye/entity/WeChatCallbackLog.java b/cdp-tools/cdp-tool-wechat/cdp-wechat-server/src/main/java/com/baiye/entity/WeChatCallbackLog.java index 426715f..c9eed63 100644 --- a/cdp-tools/cdp-tool-wechat/cdp-wechat-server/src/main/java/com/baiye/entity/WeChatCallbackLog.java +++ b/cdp-tools/cdp-tool-wechat/cdp-wechat-server/src/main/java/com/baiye/entity/WeChatCallbackLog.java @@ -39,6 +39,7 @@ public class WeChatCallbackLog { @ApiModelProperty(value = "标识") private String cusQrUuId; + @Column(name = "create_time") @ApiModelProperty(value = "创建时间") private Date createTime; diff --git a/pom.xml b/pom.xml index 0348427..988098d 100644 --- a/pom.xml +++ b/pom.xml @@ -20,6 +20,7 @@ cdp-entrance cdp-tag-center cdp-customer + @@ -179,8 +180,9 @@ io.netty netty-all - 4.1.6.Final + 4.1.49.Final + com.lmax disruptor