企业微信发送消息

master
bynt 2 years ago
parent 00392d228d
commit 19cc177d9b

@ -0,0 +1,52 @@
package com.baiye.core.base;
import com.baiye.core.base.api.Result;
import com.baiye.core.base.api.ResultCode;
import com.baiye.core.constant.MessageConstant;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.Getter;
import java.io.Serializable;
/**
* @author Enzo
* @date : 2022/8/11
*/
@Data
@Getter
@ApiModel(value = "websocket响应")
public class WebSocketResponse<T> implements Serializable {
@ApiModelProperty(value = "状态码", required = true)
private int code;
@ApiModelProperty(value = "类型", required = true)
private String cmd;
@ApiModelProperty(value = "业务数据")
@JsonInclude(JsonInclude.Include.NON_NULL)
private Object data;
public WebSocketResponse(int code, T data, String cmd) {
this.code = code;
this.cmd = cmd;
this.data = data;
}
public static <T> WebSocketResponse<T> data(T data) {
return data(data, MessageConstant.DEFAULT_SUCCESS_MESSAGE);
}
public static <T> WebSocketResponse<T> data(T data, String msg) {
return data(ResultCode.SUCCESS.getCode(), data, msg);
}
public static <T> WebSocketResponse<T> data(int code, T data, String cmd) {
return new WebSocketResponse<>(code, data, cmd);
}
}

@ -0,0 +1,32 @@
package com.baiye.core.base.api;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author Enzo
* @date : 2022/8/11
*/
@Getter
@AllArgsConstructor
public enum WebsocketCode {
/**
*
*/
CHAT_SEND(1, "Chat.send"),
/**
*
*/
CHAT_MSG(2, "Chat.msg");
/**
*
*/
final int code;
/**
*
*/
final String msg;
}

@ -0,0 +1,17 @@
package com.baiye.core.constant;
/**
* @author Enzo
* @date : 2022/8/9
*/
public class AdPlatFormConstants {
private AdPlatFormConstants() {
}
public static final String AD_PLATFORM = "ad-platform";
public static final String AD_PLATFORM_CALLBACK = "https://baiyee.vip/wechat/callback";
}

@ -138,4 +138,9 @@ public class CacheKey {
public final String USER_CACHE_KEY_PRE = "user::username:";
public final String ONLINE_USER = "online:user:";
/**
* id
*/
public final String MESSAGE_KEY = "message:id:";
}

@ -0,0 +1,13 @@
package com.baiye.core.constant;
/**
* @author Enzo
* @date : 2022/8/10
*/
public class EncryptionConstants {
private EncryptionConstants(){
}
public static final String ENCRYPTION_KEY = "93PmJkwaIOMYSJDB";
}

@ -0,0 +1,48 @@
package com.baiye.core.util;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.crypto.Mode;
import cn.hutool.crypto.Padding;
import cn.hutool.crypto.symmetric.AES;
/**
* @author Enzo
* @date : 2022/8/10
*/
public class AesEncrypt {
private AesEncrypt() {
//构造器
}
/**
*
*
* @param content
* @param key
* @return
*/
public static String encrypt(String content, String key) {
//加密为16进制表示
return getAes(key).encryptHex(content);
}
/**
*
*
* @param encryptContent
* @param key
* @return
*/
public static String decrypt(String encryptContent, String key) {
// 解密为字符串
return getAes(key).decryptStr(encryptContent, CharsetUtil.CHARSET_UTF_8);
}
private static AES getAes(String key) {
//构建
return new AES(Mode.ECB, Padding.PKCS5Padding, key.getBytes());
}
}

@ -25,27 +25,11 @@
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
</dependencies>
<!-- <build>-->
<!-- <plugins>-->
<!-- <plugin>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-maven-plugin</artifactId>-->
<!-- </plugin>-->
<!-- </plugins>-->
<!-- </build>-->
</project>

@ -1,8 +1,4 @@
server:
port: 7070
spring:
application:
name: auth-server
datasource:
hikari:
#指定校验连接合法性执行的sql语句
@ -28,7 +24,4 @@ spring:
file-extension: yml
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
profiles:
active: dev
ip:
local-parsing: true

@ -0,0 +1,29 @@
spring:
datasource:
hikari:
#指定校验连接合法性执行的sql语句
connection-test-query: select 1
auto-commit: true
#注意的是这个地方的定义需要用在DataSourceConfig.java里面使用是因为在定义JobEventConfig.java的时候需要引入自定义dataSource
password: yuyou@RDS2020
jdbc-url: jdbc:mysql://rm-bp1693kl5d490o5cn.mysql.rds.aliyuncs.com:3306/db_backstage?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull
username: root
redis:
#数据库索引
database: 9
host: 172.16.69.134
port: 6379
#连接超时时间
timeout: 5000
cloud:
nacos:
# server-addr: 8.130.96.163:8848
server-addr: 172.16.69.134:8848
config:
server-addr: ${spring.cloud.nacos.server-addr}
file-extension: yml
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
ip:
local-parsing: true

@ -0,0 +1,9 @@
server:
port: 7070
spring:
application:
name: auth-server
profiles:
active: dev
ip:
local-parsing: true

@ -0,0 +1,76 @@
spring:
redis:
#数据库索引
database: 9
host: 8.130.96.163
port: 6379
password:
datasource:
url: jdbc:mysql://${MYSQL_HOST:8.130.96.163}:${MYSQL_PORT:3306}/${MYSQL_DB:gateway}?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true
driver-class-name: com.mysql.jdbc.Driver
username: ${MYSQL_USER:root}
password: ${MYSQL_PASSWORD:y7z7noq2}
cloud:
gateway:
discovery:
locator:
enabled: true
lower-case-service-id: true # admin-service ADMIN-SERVICE /admin-service/** -> 微服务 (ADMIN-SERVICE) 自动转发,忽略大小写
routes:
- id: backstage-server
uri: lb://backstage-server # 负载均衡转发到哪个目的地
predicates:
- Path=/sys/**
filters:
- StripPrefix=1
- id: cdp-wechat-api
uri: lb://cdp-wechat-api
predicates:
- Path=/api-wechat/**
filters:
- StripPrefix=1
- id: cdp-entrance-api
uri: lb://cdp-entrance-api
predicates:
- Path=/api-file/**
filters:
- StripPrefix=1
nacos:
server-addr: 8.130.96.163:8848
# server-addr: 101.35.109.129
config:
server-addr: ${spring.cloud.nacos.server-addr}
file-extension: yml
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# sentinel:
# datasource:
# # 使用动态规则的数据源 - 把 gw-flow.json| api-group.json 文件放在 Nacos 中配置
# # 使用 Nacos 在持久化我们的配置时候要添加一个依赖 sentinel-datasource-nacos
# ds1.nacos:
# serverAddr: nacos-server:8848
# dataId: gw-flow
# ruleType: gw_flow
# ds2.nacos:
# serverAddr: nacos-server:8848
# dataId: api-group
# ruleType: gw_api_group
# ds1.file:
# file: classpath:gw-flow.json # 网关 + API分组的限流
# ruleType: gw_flow
# ds2.file:
# file: classpath:api-group.json
# ruleType: gw_api_group
#
# 配置 sentinel-dashboard
# 测试环境只能使用本机的环境,直接使用线上是不行的
# 配置好之后,取代上面的配置方式,直接使用可视化的方式来替代json配置文件的方式
# sentinel-dashboard 的规则定义好了后我们的网关能立马的感知到生效nacos无法感知但是我们下次重启会丢失规则,所以需要把持久化的规则手动添加到Nacos中
transport:
dashboard: sentinel-server:8858

@ -0,0 +1,81 @@
server:
port: 80
spring:
application:
name: @artifactId@
redis:
#数据库索引
database: 9
host: 172.16.69.134
port: 6379
password:
datasource:
url: jdbc:mysql://${MYSQL_HOST:rm-bp1693kl5d490o5cn.mysql.rds.aliyuncs.com}:${MYSQL_PORT:3306}/${MYSQL_DB:gateway}?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true
driver-class-name: com.mysql.jdbc.Driver
username: ${MYSQL_USER:root}
password: ${MYSQL_PASSWORD:yuyou@RDS2020}
cloud:
gateway:
discovery:
locator:
enabled: true
lower-case-service-id: true # admin-service ADMIN-SERVICE /admin-service/** -> 微服务 (ADMIN-SERVICE) 自动转发,忽略大小写
routes:
- id: backstage-server
uri: lb://backstage-server # 负载均衡转发到哪个目的地
predicates:
- Path=/sys/**
filters:
- StripPrefix=1
- id: cdp-wechat-api
uri: lb://cdp-wechat-api
predicates:
- Path=/api-wechat/**
filters:
- StripPrefix=1
- id: cdp-entrance-api
uri: lb://cdp-entrance-api
predicates:
- Path=/api-file/**
filters:
- StripPrefix=1
nacos:
server-addr: 172.16.69.134:8848
# server-addr: 101.35.109.129:8848
config:
server-addr: ${spring.cloud.nacos.server-addr}
file-extension: yml
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# sentinel:
# datasource:
# # 使用动态规则的数据源 - 把 gw-flow.json| api-group.json 文件放在 Nacos 中配置
# # 使用 Nacos 在持久化我们的配置时候要添加一个依赖 sentinel-datasource-nacos
# ds1.nacos:
# serverAddr: nacos-server:8848
# dataId: gw-flow
# ruleType: gw_flow
# ds2.nacos:
# serverAddr: nacos-server:8848
# dataId: api-group
# ruleType: gw_api_group
# ds1.file:
# file: classpath:gw-flow.json # 网关 + API分组的限流
# ruleType: gw_flow
# ds2.file:
# file: classpath:api-group.json
# ruleType: gw_api_group
#
# 配置 sentinel-dashboard
# 测试环境只能使用本机的环境,直接使用线上是不行的
# 配置好之后,取代上面的配置方式,直接使用可视化的方式来替代json配置文件的方式
# sentinel-dashboard 的规则定义好了后我们的网关能立马的感知到生效nacos无法感知但是我们下次重启会丢失规则,所以需要把持久化的规则手动添加到Nacos中
transport:
dashboard: sentinel-server:8858

@ -3,78 +3,8 @@ server:
spring:
application:
name: @artifactId@
redis:
#数据库索引
database: 9
host: 8.130.96.163
port: 6379
password:
datasource:
url: jdbc:mysql://${MYSQL_HOST:8.130.96.163}:${MYSQL_PORT:3306}/${MYSQL_DB:gateway}?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true
driver-class-name: com.mysql.jdbc.Driver
username: ${MYSQL_USER:root}
password: ${MYSQL_PASSWORD:y7z7noq2}
cloud:
gateway:
discovery:
locator:
enabled: true
lower-case-service-id: true # admin-service ADMIN-SERVICE /admin-service/** -> 微服务 (ADMIN-SERVICE) 自动转发,忽略大小写
routes:
- id: backstage-server
uri: lb://backstage-server # 负载均衡转发到哪个目的地
predicates:
- Path=/sys/**
filters:
- StripPrefix=1
- id: cdp-wechat-api
uri: lb://cdp-wechat-api
predicates:
- Path=/api-wechat/**
filters:
- StripPrefix=1
- id: cdp-entrance-api
uri: lb://cdp-entrance-api
predicates:
- Path=/api-file/**
filters:
- StripPrefix=1
nacos:
server-addr: 8.130.96.163:8848
# server-addr: 101.35.109.129
config:
server-addr: ${spring.cloud.nacos.server-addr}
file-extension: yml
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# sentinel:
# datasource:
# # 使用动态规则的数据源 - 把 gw-flow.json| api-group.json 文件放在 Nacos 中配置
# # 使用 Nacos 在持久化我们的配置时候要添加一个依赖 sentinel-datasource-nacos
# ds1.nacos:
# serverAddr: nacos-server:8848
# dataId: gw-flow
# ruleType: gw_flow
# ds2.nacos:
# serverAddr: nacos-server:8848
# dataId: api-group
# ruleType: gw_api_group
# ds1.file:
# file: classpath:gw-flow.json # 网关 + API分组的限流
# ruleType: gw_flow
# ds2.file:
# file: classpath:api-group.json
# ruleType: gw_api_group
#
# 配置 sentinel-dashboard
# 测试环境只能使用本机的环境,直接使用线上是不行的
# 配置好之后,取代上面的配置方式,直接使用可视化的方式来替代json配置文件的方式
# sentinel-dashboard 的规则定义好了后我们的网关能立马的感知到生效nacos无法感知但是我们下次重启会丢失规则,所以需要把持久化的规则手动添加到Nacos中
transport:
dashboard: sentinel-server:8858
profiles:
active: dev
security:
decode:
private-key: MIICeQIBADANBgkqhkiG9w0BAQEFAASCAmMwggJfAgEAAoGBAKmB93JM4cIFilnvcV1gSuPc3GlFUwo3JGGbOTkL9JP0W9Vz5CsbYbVbEtU+gnw6m3//4DSmOYeqlJJzsylVLdSe4flx2MsCceXxudsE4dHKqYU6zxBvsZB+FNFRoA2kjAtSJDaBCRryP5zXYVUYtVZW16pHkco/I35NwDevkvsJAgMBAAECgYEAkb5SoO9jyOv0mgFbirrJiYQT61uU25qv6FCLcfZKk5AuCai8jSfc6g7lcX5uuRu3KeY6w2oM2w4a8pUSfb5kSR4SAzSL6A94GIKfr3FDBxhrMxB+ko5vDoOwmmDi6jLGUFhxpiwT2Dz2c/Jz2kZJVb6RetOhh8fzdh3cz8jNQoECQQDg3+rX4d7WTS8xIFf3aQ8UKSCyDYpoNCVEsh9jaCsLrhsNXHMw8IX4KTjxsMvf3djztYuNsULHtwMkywGOA1ixAkEAwPg00MkqhBH1we4N8MCnXajZ+/UWch5b40QFBZrcbvx2k/CI25xH52FccmsEvVfuv1uFPto4sPTn7zAcKOzd2QJBANNEK3tkvhkx7ItH6r0LW9pCBXCiXg0mKHlQq0KWRBK2IL49xA8PeZ27BL+0rOWGpGPahOC6z5WYMADqJGzKYhECQQC1xZjKy5QXEGv1n+7VyERfv3yHEJlyKhLYg+xPGU9n5L02IjBo3lAFLvtMdG7khP5Kl0VurVUVq70G3MTJ8EYRAkEAm2hy1mCTMTWJYSlXieWnZxJaXn0MOrv4RyRsoXUTH1tgZ6MCqm0a7YxU0byPU0h1W2kLXXZa8P+vu2finQkvCA==

@ -17,8 +17,7 @@ public interface IRemoteWechatService {
/**
*
* @return
* @param userId
*/
@GetMapping(value = "/wechat/getWxId")
List<String> getWechatId(@RequestParam("userId") Long userId);
List<String> getWechatId();
}

@ -116,5 +116,9 @@ public interface IUserService extends IService<User> {
*/
UserSmallDto getUserDetails(String username);
/**
*
* @return
*/
Map<String,Object> getCurrentUserInfo();
}

@ -496,7 +496,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IU
List<String> roles = authorities.stream().map(GrantedAuthority::getAuthority).collect(Collectors.toList());
map.put("roles", roles);
List<String> wechatIds =
remoteWechatService.getWechatId(userId);
remoteWechatService.getWechatId();
map.put("wechatIds", wechatIds);
return map;
}

@ -3,8 +3,8 @@ spring:
cloud:
nacos:
discovery:
# server-addr: ${NACOS_HOST:8.130.96.163}:${NACOS_PORT:8848}
server-addr: ${NACOS_HOST:8.130.96.163}:${NACOS_PORT:8848}
# server-addr: ${NACOS_HOST:101.35.109.129}:${NACOS_PORT:8848}
redis:
database: 9
host: 8.130.96.163

@ -3,18 +3,18 @@ spring:
cloud:
nacos:
discovery:
server-addr: ${NACOS_HOST:172.16.190.245}:${NACOS_PORT:8848}
server-addr: ${NACOS_HOST:172.16.69.134}:${NACOS_PORT:8848}
redis:
database: 2
host: 8.130.96.163
host: 172.16.69.134
timeout: 5000
datasource:
druid:
db-type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://172.16.190.245:3306/ad_platform?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull
url: jdbc:mysql://rm-bp1693kl5d490o5cn.mysql.rds.aliyuncs.com:3306/db_backstage?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull
username: root
password: baiye@RDS2022
password: yuyou@RDS2020
# 初始连接数
initial-size: 5
# 最小连接数

@ -43,7 +43,7 @@
<dependency>
<groupId>com.baiye</groupId>
<artifactId>cdp-wechat-core</artifactId>
<artifactId>cdp-wechat-server</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
@ -71,13 +71,25 @@
</dependencies>
<!-- 配置插件 -->
<build>
<plugins>
<!-- springboot 打包插件 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.baiye.CdpToolMqProducerApplication</mainClass>
<layout>ZIP</layout>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -17,7 +17,7 @@ import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
@EnableFeignClients
@EnableDiscoveryClient
@MapperScan("com.baiye.module.dao")
@MapperScan("com.baiye.dao")
public class CdpToolMqProducerApplication {
public static void main(String[] args) {

@ -1,30 +0,0 @@
package com.baiye.config;
import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import org.apache.ibatis.reflection.MetaObject;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class MyBatisHandler implements MetaObjectHandler {
/**
*
* @param metaObject
*/
@Override
public void insertFill(MetaObject metaObject) {
this.setFieldValByName("createTime",new Date(),metaObject);
this.setFieldValByName("updateTime",new Date(),metaObject);
}
/**
*
* @param metaObject
*/
@Override
public void updateFill(MetaObject metaObject) {
this.setFieldValByName("updateTime",new Date(),metaObject);
}
}

@ -1,11 +1,13 @@
package com.baiye.feign.wechat;
import com.baiye.core.base.api.Result;
import com.baiye.core.constant.SecurityConstants;
import com.baiye.core.constant.WeChatResponse;
import com.baiye.feign.wechat.fallback.WechatClientFallBack;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
@FeignClient(value = "cdp-wechat-api",fallback = WechatClientFallBack.class)
public interface WechatClient {
@ -13,5 +15,5 @@ public interface WechatClient {
String PAY_PREFIX = "/wechat";
@PostMapping(PAY_PREFIX+ "/login")
Result<String> frontRcvResponse(@RequestBody WeChatResponse weChatResponse);
Result<String> frontRcvResponse(@RequestBody WeChatResponse weChatResponse, @RequestHeader(SecurityConstants.FROM) String from);
}

@ -5,10 +5,15 @@ import com.baiye.core.constant.WeChatResponse;
import com.baiye.feign.wechat.WechatClient;
import org.springframework.stereotype.Component;
/**
* @author Enzo
* @date 2022-8-11
*/
@Component
public class WechatClientFallBack implements WechatClient {
@Override
public Result<String> frontRcvResponse(WeChatResponse weChatResponse) {
public Result<String> frontRcvResponse(WeChatResponse weChatResponse, String from) {
return null;
}
}

@ -1,9 +1,16 @@
package com.baiye.listener;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.baiye.constant.RedisConstant;
import com.baiye.core.constant.WeChatResponse;
import com.baiye.core.util.RedisUtils;
import com.baiye.dto.MessageRecordDTO;
import com.baiye.dto.TransmissionMessageDTO;
import com.baiye.entity.MessageRecord;
import com.baiye.service.MessageRecordService;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
@ -14,27 +21,39 @@ import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* MQ
* @author Enzo
* @date 2022-8-10
* MQ
*/
@Component
@RequiredArgsConstructor
@RocketMQTransactionListener(txProducerGroup = "chat-msg")
public class ChatMessageListenerImpl implements RocketMQLocalTransactionListener {
@Resource
private RedisUtils redisUtils;
private final RedisUtils redisUtils;
private final MessageRecordService messageRecordService;
@Override
@Transactional(rollbackFor = Exception.class)
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
WeChatResponse weChatResponse = JSON.parseObject(new String((byte[]) msg.getPayload()), WeChatResponse.class);
String key = RedisConstant.REDIS_WT_MRD + weChatResponse.getMessageRecordId();
String value = (String)redisUtils.get(key);
if (value != null){
return RocketMQLocalTransactionState.COMMIT;
}else {
TransmissionMessageDTO messageDTO = JSONUtil.toBean
(new String((byte[]) msg.getPayload()), TransmissionMessageDTO.class);
if (messageDTO != null && messageDTO.getId() != null) {
MessageRecord messageRecord = MessageRecord.builder().build();
BeanUtil.copyProperties(messageDTO, messageRecord);
boolean saveResult =
messageRecordService.save(messageRecord);
if (saveResult) {
// 保存数据
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
// 重试机制
return RocketMQLocalTransactionState.UNKNOWN;
}
/**
@ -42,7 +61,13 @@ public class ChatMessageListenerImpl implements RocketMQLocalTransactionListener
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
return RocketMQLocalTransactionState.COMMIT;
MessageRecord messageRecord =
JSONUtil.toBean(new String((byte[]) msg.getPayload()), MessageRecord.class);
if (messageRecord != null && messageRecord.getId() != null) {
return messageRecordService.getById(messageRecord.getId()) != null
? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}

@ -0,0 +1,24 @@
package com.baiye.module.controller;
import com.baiye.entity.MessageRecord;
import com.baiye.listener.ChatMessageSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Random;
@RestController
@RequestMapping("/demo01")
public class Demo01Controller {
private Logger logger = LoggerFactory.getLogger(getClass());
}

@ -7,6 +7,10 @@ import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* @author Enzo
* @date 2022-8-11
*/
@RestController
@RequestMapping("/wechat")
public class WechatMessageController {

@ -1,9 +0,0 @@
package com.baiye.module.dao;
import com.baiye.entity.MessageRecord;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface MessageRecordMapper extends BaseMapper<MessageRecord> {
}

@ -1,8 +1,14 @@
package com.baiye.module.service;
import com.baiye.core.constant.WeChatResponse;
import com.baiye.entity.MessageRecord;
import com.baomidou.mybatisplus.extension.service.IService;
public interface WechatMessageService {
/**
* @author Enzo
* @date 2022-8-9
*/
public interface WechatMessageService extends IService<MessageRecord> {
/**
*

@ -1,23 +1,29 @@
package com.baiye.module.service.impl;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.baiye.constant.MqConstant;
import com.baiye.constant.RedisConstant;
import com.baiye.core.base.api.Result;
import com.baiye.core.constant.HttpStatus;
import com.baiye.core.constant.WeChatResponse;
import com.baiye.core.constant.WechatCallbackStatus;
import com.baiye.core.constant.*;
import com.baiye.core.util.RedisUtils;
import com.baiye.dao.MessageRecordMapper;
import com.baiye.dto.TransmissionMessageDTO;
import com.baiye.dto.WeChatUserLoginDTO;
import com.baiye.entity.EnterpriseWeChatUser;
import com.baiye.entity.MessageRecord;
import com.baiye.entity.PersonalWeChatUser;
import com.baiye.entity.WechatMessageContent;
import com.baiye.exception.global.BadRequestException;
import com.baiye.feign.wechat.WechatClient;
import com.baiye.listener.ChatMessageSource;
import com.baiye.module.dao.MessageRecordMapper;
import com.baiye.module.service.WechatMessageService;
import com.baiye.service.PersonalWeChatUserService;
import com.baiye.service.WeChatUserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Value;
@ -26,35 +32,41 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
@Service
/**
* @author yqy
* @date 2022-8-9
*/
@Slf4j
public class WechatMessageServiceImpl implements WechatMessageService {
@Service
@RequiredArgsConstructor
public class WechatMessageServiceImpl extends
ServiceImpl<MessageRecordMapper, MessageRecord> implements WechatMessageService {
private final RedisUtils redisUtils;
private final WechatClient wechatClient;
private final WeChatUserService weChatUserService;
@Resource
private ChatMessageSource chatMessageSource;
@Resource
private MessageRecordMapper messageRecordMapper;
@Resource
private RedisUtils redisUtils;
@Resource
private WechatClient wechatClient;
private final ChatMessageSource chatMessageSource;
private final MessageRecordMapper messageRecordMapper;
private final PersonalWeChatUserService personalWeChatUserService;
@Value("${snowflake.workerId}")
private int workerId;
@Value("${snowflake.datacenterId}")
private int datacenterId;
@Value("${spring.cloud.stream.bindings.wechat-output.destination}")
private String topic;
/**
*
*/
@Override
public void analyticalData(WeChatResponse weChatResponse) {
log.info("====接收到微信消息回调:{}=====", weChatResponse);
log.info("=================== the response as {} ===================", JSONUtil.toJsonStr(weChatResponse));
Integer status = weChatResponse.getStatus();
switch (status) {
// 登录回调请求
@ -78,33 +90,44 @@ public class WechatMessageServiceImpl implements WechatMessageService {
*/
@Transactional(rollbackFor = Exception.class)
public void wechatMessage(WeChatResponse weChatResponse) {
log.info("==================== the weChatResponse as {} ====================", JSONUtil.toJsonStr(weChatResponse));
Long messageRecordId = IdUtil.getSnowflake(workerId, datacenterId).nextId();
// 解析JSON
JSONArray array = JSONUtil.parseArray(weChatResponse.getData());
List<WechatMessageContent> wechatMessageContentList = JSONUtil.toList(array, WechatMessageContent.class);
WechatMessageContent wechatMessageContent = wechatMessageContentList.get(0);
// 设置senderId为发送者微信ID进行顺序消息规则
weChatResponse.setSenderId(wechatMessageContent.getWechat());
weChatResponse.setMessageRecordId(messageRecordId);
// TODO 初始化消息设置
// 持久化消息
MessageRecord messageRecord = new MessageRecord();
messageRecord.setMsg(JSONUtil.toJsonStr(weChatResponse));
messageRecord.setId(messageRecordId);
messageRecord.setMsgType(weChatResponse.getStatus());
int flag = messageRecordMapper.insert(messageRecord);
if (flag == 1) {
// 在缓存中进行事务判断
String key = RedisConstant.REDIS_WT_MRD + messageRecordId;
redisUtils.set(key, 0, 86400);
// 构建消息
Message<WeChatResponse> message = MessageBuilder.withPayload(weChatResponse)
.setHeader(MessageConst.PROPERTY_TAGS, MqConstant.WECHAT_TAG)
.build();
// 发送消息
chatMessageSource.wechatOutPut().send(message);
} else {
throw new BadRequestException("============微信聊天消息回调插入失败:=============" + weChatResponse);
List<WechatMessageContent> wechatMessageContentList =
JSONUtil.toList(array, WechatMessageContent.class);
WechatMessageContent wechatMessageContent =
wechatMessageContentList.get(DefaultNumberConstants.ZERO_NUMBER);
// 只接受消息
if (wechatMessageContent.getType() == DefaultNumberConstants.ZERO_NUMBER) {
// 查询是否有该用户
PersonalWeChatUser byWechat =
personalWeChatUserService.findByWechat
(wechatMessageContent.getToUserName(), wechatMessageContent.getWechat());
if (ObjectUtil.isNotNull(byWechat)) {
// 持久化消息
TransmissionMessageDTO messageRecord =
TransmissionMessageDTO.builder().
userId(byWechat.getUserId()).
personalWeChatNickname(byWechat.getNickname()).
personalWeChatAvatar(byWechat.getBigHeadImgUrl()).
wechatId(wechatMessageContent.getToUserName()).
userWechatId(wechatMessageContent.getWechat()).
msg(wechatMessageContent.getMsgContent()).
status(DefaultNumberConstants.ZERO_NUMBER).
isRead(DefaultNumberConstants.ZERO_NUMBER).
msgType(wechatMessageContent.getMsgType()).
id(messageRecordId).msgFlag(Boolean.FALSE).build();
// 构建消息
Message<TransmissionMessageDTO> message =
MessageBuilder.withPayload(messageRecord).setHeader
(MessageConst.PROPERTY_TAGS, MqConstant.WECHAT_TAG).build();
// 发送消息
chatMessageSource.wechatOutPut().send(message);
return;
}
log.error("============微信聊天消息回调插入失败 {} =============" + JSONUtil.toJsonStr(weChatResponse));
}
}
@ -112,9 +135,15 @@ public class WechatMessageServiceImpl implements WechatMessageService {
*
*/
public void wechatLogin(WeChatResponse weChatResponse) {
Result<String> result = wechatClient.frontRcvResponse(weChatResponse);
if (result.getCode() != HttpStatus.SUCCESS) {
log.info("========调用微信登录处理请求失败:{},{}==========", result, DateUtil.now());
WeChatUserLoginDTO userLoginDTO =
JSONUtil.toBean(JSONUtil.toJsonStr(weChatResponse.getData()), WeChatUserLoginDTO.class);
String cusQrUuId = userLoginDTO.getCusQrUuId();
// 回调不同地址
if (cusQrUuId.startsWith(AdPlatFormConstants.AD_PLATFORM)) {
HttpUtil.post(AdPlatFormConstants.AD_PLATFORM_CALLBACK, JSONUtil.toJsonStr(weChatResponse));
return;
}
wechatClient.frontRcvResponse(weChatResponse, SecurityConstants.FROM_IN);
}
}

@ -9,12 +9,36 @@ spring:
nacos:
discovery:
# server-addr: ${NACOS_HOST:8.130.96.163}:${NACOS_PORT:8848}
server-addr: ${NACOS_HOST:101.35.109.129}:${NACOS_PORT:8848}
server-addr: ${NACOS_HOST:8.130.96.163}:${NACOS_PORT:8848}
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
wechat-output:
destination: WECHAT-TOPIC # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
# Producer 配置项,对应 ProducerProperties 类
producer:
partition-key-expression: payload['id'] # 分区 key 表达式。该表达式基于 Spring EL从消息中获得分区 key。
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 8.130.96.163:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
wechat-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
delay-level-when-next-consume: 1 # 异步消费消息模式下消费失败重试策略,默认为 0
transactional: true #是否发送事务消息,默认为 false。
group: chat-msg #生产者分组
sync: true #是否同步发送消息,默认为 false 异步。
datasource:
druid:
db-type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://8.130.96.163:3306/eladmin?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull
url: jdbc:mysql://8.130.96.163:3306/db_wechat?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull
username: root
password: y7z7noq2
# 初始连接数

@ -0,0 +1,93 @@
#配置数据源
spring:
redis:
database: 3
host: 8.130.96.163
timeout: 5000
cloud:
nacos:
discovery:
# server-addr: ${NACOS_HOST:8.130.96.163}:${NACOS_PORT:8848}
server-addr: ${NACOS_HOST:8.130.96.163}:${NACOS_PORT:8848}
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
wechat-output:
destination: WECHAT-TOPIC # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
# Producer 配置项,对应 ProducerProperties 类
producer:
partition-key-expression: payload['id'] # 分区 key 表达式。该表达式基于 Spring EL从消息中获得分区 key。
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 39.103.195.38:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
wechat-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
delay-level-when-next-consume: 1 # 异步消费消息模式下消费失败重试策略,默认为 0
transactional: true #是否发送事务消息,默认为 false。
group: chat-msg #生产者分组
sync: true #是否同步发送消息,默认为 false 异步。
datasource:
druid:
db-type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://8.130.96.163:3306/db_wechat?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull
username: root
password: y7z7noq2
# 初始连接数
initial-size: 5
# 最小连接数
min-idle: 15
# 最大连接数
max-active: 30
# 超时时间(以秒数为单位)
remove-abandoned-timeout: 180
# 获取连接超时时间
max-wait: 3000
# 连接有效性检测时间
time-between-eviction-runs-millis: 60000
# 连接在池中最小生存的时间
min-evictable-idle-time-millis: 300000
# 连接在池中最大生存的时间
max-evictable-idle-time-millis: 900000
# 指明连接是否被空闲连接回收器(如果有)进行检验.如果检测失败,则连接将被从池中去除
test-while-idle: true
# 指明是否在从池中取出连接前进行检验,如果检验失败, 则从池中去除连接并尝试取出另一个
test-on-borrow: true
# 是否在归还到池中前进行检验
test-on-return: false
# 检测连接是否有效
validation-query: select 1
# 配置监控统计
webStatFilter:
enabled: true
stat-view-servlet:
enabled: true
url-pattern: /druid/*
reset-enable: false
filter:
stat:
enabled: true
# 记录慢SQL
log-slow-sql: true
slow-sql-millis: 1000
merge-sql: true
wall:
config:
multi-statement-allow: true
#是否开启 swagger-ui
swagger:
enabled: true
# IP 本地解析
ip:
local-parsing: true

@ -1,36 +1,12 @@
spring:
profiles:
active: dev
active: prod
application:
name: @artifactId@
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
wechat-output:
destination: WECHAT-TOPIC # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
# Producer 配置项,对应 ProducerProperties 类
producer:
partition-key-expression: payload['senderId'] # 分区 key 表达式。该表达式基于 Spring EL从消息中获得分区 key。
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 8.130.96.163:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
wechat-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
transactional: true #是否发送事务消息,默认为 false。
group: chat-msg #生产者分组
sync: true #是否同步发送消息,默认为 false 异步。
server:
port: 18080
port: 18000
#hutool雪花算法
snowflake:

@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2020 pig4cloud Authors. All Rights Reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration debug="false" scan="false">
<springProperty scop="context" name="spring.application.name" source="spring.application.name" defaultValue=""/>
<property name="log.path" value="logs/${spring.application.name}"/>
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN"
value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>
<conversionRule conversionWord="wex"
converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/>
<conversionRule conversionWord="wEx"
converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
<!-- Console log output -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
</encoder>
</appender>
<!-- Log file debug output -->
<appender name="debug" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/debug.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.path}/%d{yyyy-MM, aux}/debug.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>50MB</maxFileSize>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%date [%thread] %-5level [%logger{50}] %file:%line - %msg%n</pattern>
</encoder>
</appender>
<!-- Log file error output -->
<appender name="error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/error.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.path}/%d{yyyy-MM}/error.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>50MB</maxFileSize>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%date [%thread] %-5level [%logger{50}] %file:%line - %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
</filter>
</appender>
<!--nacos 心跳 INFO 屏蔽-->
<logger name="com.alibaba.nacos" level="OFF">
<appender-ref ref="error"/>
</logger>
<!-- Level: FATAL 0 ERROR 3 WARN 4 INFO 6 DEBUG 7 -->
<root level="INFO">
<appender-ref ref="console"/>
<appender-ref ref="debug"/>
<appender-ref ref="error"/>
</root>
</configuration>

@ -1,5 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.baiye.module.dao.MessageRecordMapper">
</mapper>

@ -1,13 +1,42 @@
package com.baiye;
import cn.hutool.json.JSONUtil;
import com.baiye.core.constant.WeChatResponse;
import com.baiye.entity.MessageRecord;
import com.baiye.listener.ChatMessageSource;
import com.baiye.module.service.WechatMessageService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
class CdpToolMqProducerApplicationTests {
import javax.annotation.Resource;
import java.util.Random;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CdpToolMqProducerApplication.class,
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class CdpToolMqProducerApplicationTests {
@Autowired
private ChatMessageSource mySource;//<1>
@Resource
private WechatMessageService wechatMessageService;
@Test
void contextLoads() {
public void contextLoads() {
String s = "{\"data\":[{\"buffer\":\"\",\"createTime\":1660031216,\"fromUserName\":\"5629503579219451\",\"manual\":false,\"msgContent\":\"服务 blue-book 触发告警部份接口响应500异常, 数量 1时间2022-08-09 15:46:56详情http://vesta.51.nb/event/9667953消息发送给开发鲍裕盛张万华运维谢凡其他提醒人王俊杰\\n告警时间2022-08-09 15:46:56\\n<a href=\\\"http://vesta.51.nb/event/9667953\\\">查看图表</a>\",\"msgId\":4599457,\"msgKey\":\"1688850251907471&&4599457\",\"msgType\":1,\"newMsgId\":4599457,\"pushTime\":\"2022-08-09T15:46:58.779\",\"toUserName\":\"1688854488377509\",\"type\":0,\"wechat\":\"7881301825047621\"}],\"msg\":\"文本消息回调\",\"status\":203,\"success\":false}";
wechatMessageService.analyticalData(JSONUtil.toBean(s, WeChatResponse.class));
}
}

@ -44,15 +44,14 @@
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.3.0.CR9</version>
</dependency>
<dependency>
<groupId>com.baiye</groupId>
<artifactId>cdp-wechat-server</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

@ -1,19 +0,0 @@
package com.baiye;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
/**
* @author Enzo
* @date 2022-7-20
*/
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class CdpToolNettyDisruptorApplication {
public static void main(String[] args) {
SpringApplication.run(CdpToolNettyDisruptorApplication.class, args);
}
}

@ -2,8 +2,8 @@ package com.baiye.app;
import com.baiye.common.MessageConsumer;
import com.baiye.common.RingBufferWorkerPoolFactory;
import com.baiye.server.MessageConsumerImpl;
import com.baiye.server.SocketServer;
import com.baiye.service.MessageConsumerImpl;
import com.baiye.service.SocketServer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.RequiredArgsConstructor;

@ -0,0 +1,10 @@
package com.baiye.service;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class ChannelGroupConfig {
//存储每一个客户端接入进来的对象
public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

@ -0,0 +1,41 @@
package com.baiye.service;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author Enzo
* @date 2022-8-10
* Netty ,
* HeartBeatHandler , ,
*/
@Slf4j
@Service
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
/**
*
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) {
case READER_IDLE:
log.info("读空闲触发。。。");
break;
case WRITER_IDLE:
log.info("写空闲触发。。。");
break;
case ALL_IDLE:
log.info("读写空闲触发。。。");
ctx.channel().close();
break;
default:
}
}
}
}

@ -1,4 +1,4 @@
package com.baiye.server;
package com.baiye.service;
import com.baiye.common.MessageConsumer;
import com.baiye.common.TranslatorDataWraper;
@ -17,7 +17,7 @@ public class MessageConsumerImpl extends MessageConsumer {
@Override
public void onEvent(TranslatorDataWraper translatorDataWraper) throws Exception {
log.info("收到客户端消息----------{}", translatorDataWraper.getMsg());
log.info("收到客户端消息----------{}", translatorDataWraper.getMsg());
ChannelHandlerContext ctx = translatorDataWraper.getContext();
ctx.channel().writeAndFlush("返回给客户端消息");

@ -1,4 +1,4 @@
package com.baiye.server;
package com.baiye.service;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;

@ -1,26 +1,47 @@
package com.baiye.server;
package com.baiye.service;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.baiye.request.MessageRequest;
/**
* @author Enzo
* @date : 2022/8/11
*/
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.http.HttpStatus;
import cn.hutool.json.JSONUtil;
import com.baiye.core.base.WebSocketResponse;
import com.baiye.core.base.api.Result;
import com.baiye.core.base.api.ResultCode;
import com.baiye.core.base.api.WebsocketCode;
import com.baiye.core.constant.DefaultNumberConstants;
import com.baiye.core.constant.EncryptionConstants;
import com.baiye.core.util.AesEncrypt;
import com.baiye.dto.PersonalWeChatUserDTO;
import com.baiye.dto.WeChatSendTextDTO;
import com.baiye.dto.WebsocketMessageRequestDTO;
import com.baiye.entity.MessageRecord;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
@ -29,13 +50,21 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
* @date 2022-7-21
*/
@Slf4j
@Component
@Service
@ChannelHandler.Sharable
public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketServerHandshaker handShaker;
private ChannelHandlerContext channelHandlerContext;
@Resource
private WeChatUserService weChatUserService;
@Resource
private MessageRecordService messageRecordService;
@Resource
private PersonalWeChatUserService personalWeChatUserService;
/**
@ -51,7 +80,7 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
/**
* channelidredishash
*/
private static final Map<String, Long> CLIENT_MAP = new ConcurrentHashMap<>();
private static final Map<String, String> CLIENT_MAP = new ConcurrentHashMap<>();
/**
@ -63,7 +92,6 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpRequest) {
//以http请求形式接入但是走的是websocket
handleHttpRequest(ctx, (FullHttpRequest) msg);
@ -71,27 +99,6 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
//处理websocket客户端的消息
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
}
/*//接受客户端发送的消息
MessageRequest messageRequest = JSON.parseObject(msg.toString(), MessageRequest.class);
//每个channel都有idasLongText是全局channel唯一id
String key = ctx.channel().id().asLongText();
//存储channel的id和用户的主键
CLIENT_MAP.put(key, messageRequest.getUnionId());
log.info("接受客户端的消息......" + ctx.channel().remoteAddress() + "-参数[" + messageRequest.getUnionId() + "]");
if (!CHANNEL_MAP.containsKey(key)) {
//使用channel中的任务队列做周期循环推送客户端消息解决问题二和问题五
Future future = ctx.channel().eventLoop().scheduleAtFixedRate(new WebsocketRunnable(ctx, messageRequest), 0, 10, TimeUnit.SECONDS);
//存储客户端和服务的通信的Chanel
CHANNEL_MAP.put(key, ctx.channel());
//存储每个channel中的future保证每个channel中有一个定时任务在执行
FUTURE_MAP.put(key, future);
} else {
//每次客户端和服务的主动通信,和服务端周期向客户端推送消息互不影响 解决问题一
ctx.channel().writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy"));
}
*/
}
/**
@ -112,8 +119,7 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
public void handlerRemoved(ChannelHandlerContext ctx) {
String key = ctx.channel().id().asLongText();
//移除通信过的channel
CHANNEL_MAP.remove(key);
@ -182,6 +188,24 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
// 添加WebSocketEncoder和WebSocketDecoder之后服务端就可以自动对WebSocket消息进行编解码了
handShaker.handshake(ctx.channel(), req);
}
//接受客户端发送的消息
String uri = req.uri();
if (StringUtils.isNotBlank(uri)) {
String wechatUri =
uri.substring(uri.indexOf("=") + DefaultNumberConstants.ONE_NUMBER);
String key = ctx.channel().id().asLongText();
String decrypt = AesEncrypt.decrypt(wechatUri, EncryptionConstants.ENCRYPTION_KEY);
//存储channel的id和用户的主键
CLIENT_MAP.put(key, decrypt);
log.info("接受客户端的消息......" + ctx.channel().remoteAddress() + "-参数[" + wechatUri + "]");
if (!CHANNEL_MAP.containsKey(decrypt) || !CHANNEL_MAP.get(decrypt).isActive()) {
//存储客户端和服务的通信的Chanel
CHANNEL_MAP.put(decrypt, ctx.channel());
} else {
//每次客户端和服务的主动通信,和服务端周期向客户端推送消息互不影响 解决问题一
ctx.channel().writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy"));
}
}
}
/**
@ -190,7 +214,7 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
private static void sendHttpResponse(ChannelHandlerContext ctx,
FullHttpRequest req, DefaultFullHttpResponse res) {
// 返回应答给客户端
if (res.status().code() != 200) {
if (res.status().code() != HttpStatus.HTTP_OK) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
CharsetUtil.UTF_8);
res.content().writeBytes(buf);
@ -198,7 +222,7 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
}
ChannelFuture f = ctx.channel().writeAndFlush(res);
// 如果是非Keep-Alive关闭连接
if (!isKeepAlive(req) || res.status().code() != 200) {
if (!isKeepAlive(req) || res.status().code() != HttpStatus.HTTP_OK) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
@ -222,13 +246,26 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
throw new UnsupportedOperationException(String.format(
"%s frame types not supported", frame.getClass().getName()));
}
// 返回应答消息
String request = ((TextWebSocketFrame) frame).text();
log.debug("服务端收到:" + request);
TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
+ ctx.channel().id() + "" + request);
ctx.channel().writeAndFlush(tws);
WebsocketMessageRequestDTO websocketMessageRequestDTO
= JSONUtil.toBean(((TextWebSocketFrame) frame).text(), WebsocketMessageRequestDTO.class);
messageRecordService.saveMessageByUserId(websocketMessageRequestDTO);
PersonalWeChatUserDTO personalWeChatUserDTO = personalWeChatUserService.queryByUserId(1L);
if (personalWeChatUserDTO != null) {
WeChatSendTextDTO weChatSendTextDTO = new WeChatSendTextDTO();
weChatSendTextDTO.setContent("");
weChatSendTextDTO.setTagetWxId(personalWeChatUserDTO.getWeChatId());
weChatSendTextDTO.setWechat(personalWeChatUserDTO.getEnterpriseId());
weChatUserService.sendText(weChatSendTextDTO);
}
WebSocketResponse<String> data = WebSocketResponse.data
(ResultCode.SUCCESS.getMsg(), WebsocketCode.CHAT_MSG.getMsg());
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(data)));
}
public static Map<String, Channel> getChannelMap() {
@ -239,7 +276,7 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
return FUTURE_MAP;
}
public static Map<String, Long> getClientMap() {
public static Map<String, String> getClientMap() {
return CLIENT_MAP;
}
@ -247,4 +284,13 @@ public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
String location = req.headers().get(HttpHeaderNames.HOST) + req.uri();
return "ws://" + location;
}
public void sendMessage(String wechatId, Map<String, Object> hashMap) {
Channel channel = CHANNEL_MAP.get(wechatId);
if (ObjectUtil.isNotNull(channel) && channel.isActive()) {
WebSocketResponse<Map<String, Object>> data = WebSocketResponse.data
(ResultCode.SUCCESS.getCode(), hashMap, WebsocketCode.CHAT_MSG.getMsg());
channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(data)));
}
}
}

@ -1,4 +1,4 @@
package com.baiye.server;
package com.baiye.service;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
@ -14,7 +14,7 @@ public class SocketServerInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast( new LengthFieldPrepender(4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new SocketServerHandler());

@ -0,0 +1,207 @@
package com.baiye.service;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Enzo
* @date 2022-8-12
*/
@Slf4j
@Service
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
//用户id=>channel示例
//可以通过用户的唯一标识保存用户的channel
//这样就可以发送给指定的用户
public static Map<String, Channel> channelUserMap = new ConcurrentHashMap<>();
private WebSocketServerHandshaker webSocketServerHandshaker;
private static Logger LOGGER = LoggerFactory.getLogger(WebSocketHandler.class);
/**
* ,channelChannelGroup,channel
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//获取连接的channel
LOGGER.info("handlerAdded,连接channel{},连接id{}",ctx.channel(),ctx.channel().id());
ChannelGroupConfig.group.add(ctx.channel());
}
/**
*,channelChannelGroup,channel
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//从服务端的channelGroup中移除当前离开的客户端
ChannelGroupConfig.group.remove(channel);
//获得删除channle对应的userCode
String removeUserCode = "";
for (String userCode : channelUserMap.keySet()) {
Channel userChannel = channelUserMap.get(userCode);
if(userChannel.equals(channel)){
removeUserCode = userCode;
break;
}
}
//从服务端的channelMap中移除当前离开的客户端
Collection<Channel> col = channelUserMap.values();
while(true == col.contains(channel)) {
col.remove(ctx.channel());
LOGGER.info("handlerRemoved,netty客户端连接删除成功!,删除channel:{},channelId:{}",ctx.channel(),ctx.channel().id());
}
}
/**
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("channelActive,netty与客户端建立连接通道开启channel{}连接,连接id{}",ctx.channel(),ctx.channel().id());
}
/**
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("channelInactive,netty与客户端断开连接通道关闭channel:{},channelId:{}",ctx.channel(),ctx.channel().id());
}
//工程出现异常的时候调用
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable throwable)throws Exception{
LOGGER.info("exceptionCaught,抛出异常,异常信息{}异常信息channel:{},channelId:{}",throwable.getLocalizedMessage(),context.channel(),context.channel().id());
handlerRemoved(context);
context.close();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
//处理客户端向服务端发起的http握手请求
if (o instanceof FullHttpRequest){
LOGGER.info("http连接请求");
handHttpRequest(channelHandlerContext,(FullHttpRequest) o);
}else if (o instanceof WebSocketFrame){//处理websocket链接业务
LOGGER.info("websocket信息请求");
handWebSocketFrame(channelHandlerContext,(WebSocketFrame) o);
}
}
/**
* websocket
* @param context
* @param webSocketFrame
*/
private void handWebSocketFrame(ChannelHandlerContext context,WebSocketFrame webSocketFrame){
if (webSocketFrame instanceof CloseWebSocketFrame){//判断是否是关闭websocket的指令
webSocketServerHandshaker.close(context.channel(),(CloseWebSocketFrame) webSocketFrame.retain());
}
if (webSocketFrame instanceof PingWebSocketFrame){//判断是否是ping消息
context.channel().write(new PongWebSocketFrame(webSocketFrame.content().retain()));
return;
}
if (!(webSocketFrame instanceof TextWebSocketFrame)){//判断是否是二进制消息
System.out.println("不支持二进制消息");
throw new RuntimeException(this.getClass().getName());
}
//获取客户端向服务端发送的消息
String text = ((TextWebSocketFrame) webSocketFrame ).text();
LOGGER.info("服务端收到客户端的消息:" + text);
}
/**
* http
* @param context
* @param fullHttpRequest
*/
private void handHttpRequest(ChannelHandlerContext context,FullHttpRequest fullHttpRequest){
LOGGER.info("请求连接的channel{}id为{}",context.channel(),context.channel().id());
//判断是否http握手请求
if (!fullHttpRequest.getDecoderResult().isSuccess()
||!("websocket".equals(fullHttpRequest.headers().get("Upgrade")))){
sendHttpResponse(context,fullHttpRequest, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(fullHttpRequest), null, true, 5 * 1024 * 1024);
webSocketServerHandshaker = wsFactory.newHandshaker(fullHttpRequest);
if (webSocketServerHandshaker == null){
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(context.channel());
}else{
webSocketServerHandshaker.handshake(context.channel(),fullHttpRequest);
}
//把token解析成用户Code
Channel channel = context.channel();
String uri = fullHttpRequest.getUri();
String userCode = uri.substring(uri.lastIndexOf("?")+1,uri.length());
channelUserMap.put(userCode,channel);
}
/**
*
* @param context
* @param fullHttpRequest
* @param defaultFullHttpResponse
*/
private void sendHttpResponse(ChannelHandlerContext context, FullHttpRequest fullHttpRequest, DefaultFullHttpResponse defaultFullHttpResponse){
if (defaultFullHttpResponse.getStatus().code() != 200){
ByteBuf buf = Unpooled.copiedBuffer(defaultFullHttpResponse.getStatus().toString(), CharsetUtil.UTF_8);
defaultFullHttpResponse.content().writeBytes(buf);
buf.release();
}
//服务端向客户端发送数据
ChannelFuture future = context.channel().writeAndFlush(defaultFullHttpResponse);
if (defaultFullHttpResponse.getStatus().code() !=200){
future.addListener(ChannelFutureListener.CLOSE);
}
}
private static String getWebSocketLocation(FullHttpRequest req) {
String location = req.headers().get(HttpHeaderNames.HOST) + req.uri();
return "ws://" + location;
}
}

@ -1,13 +1,11 @@
package com.baiye.server;
package com.baiye.service;
import com.baiye.codec.MarshallingCodeCFactory;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
@ -24,32 +22,39 @@ public class WebsocketChannelInitializer extends ChannelInitializer<SocketChanne
private final SocketServerHandler socketServerHandler;
private final HeartBeatHandler heartBeatHandler;
@Value("${websocket.url}")
private String websocketUrl;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//获取pipeline通道
ChannelPipeline pipeline = socketChannel.pipeline();
//获取pipeline通道
//因为基于http协议使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
// http 消息聚合器 512*1024为接收的最大content-length
pipeline.addLast(new HttpObjectAggregator(8192));
//是以块方式写添加ChunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler());
/*
// http 消息聚合器 512*1024为接收的最大content-length
pipeline.addLast(new HttpObjectAggregator(65536));
/*
1. websocket (frame)
2. WebSocketFrame
3. ws://localhost:7000/msg 表示请求的uri
4. WebSocketServerProtocolHandler http ws ,
5. 101
*/
*/
pipeline.addLast(new WebSocketServerProtocolHandler(websocketUrl));
// Netty 心跳处理、以及读写超时的设置
pipeline.addLast(heartBeatHandler);
// 处理类
pipeline.addLast(socketServerHandler);
}
}

@ -1,4 +1,4 @@
package com.baiye.server;
package com.baiye.service;
import com.baiye.request.MessageRequest;
import io.netty.channel.ChannelHandlerContext;
@ -18,7 +18,7 @@ public class WebsocketRunnable implements Runnable {
private MessageRequest messageRequest;
public WebsocketRunnable(ChannelHandlerContext channelHandlerContext,MessageRequest messageRequest) {
public WebsocketRunnable(ChannelHandlerContext channelHandlerContext, MessageRequest messageRequest) {
this.channelHandlerContext = channelHandlerContext;
this.messageRequest = messageRequest;
}
@ -28,7 +28,7 @@ public class WebsocketRunnable implements Runnable {
try {
channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(LocalDateTime.now().toString()));
} catch (Exception e) {
log.error("websocket服务器推送消息发生错误",e);
log.error("websocket服务器推送消息发生错误", e);
}
}
}

@ -37,17 +37,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.baiye</groupId>
<artifactId>cdp-wechat-server</artifactId>
<version>1.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<artifactId>hutool-all</artifactId>
<groupId>cn.hutool</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -70,6 +59,7 @@
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.github.ulisesbocchio</groupId>
<artifactId>jasypt-spring-boot-starter</artifactId>
@ -87,14 +77,6 @@
</exclusions>
</dependency>
<!--<dependency>
<groupId>com.baiye</groupId>
<artifactId>cdp-common-security</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
@ -102,21 +84,79 @@
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<exclusions>
<exclusion>
<artifactId>asm</artifactId>
<groupId>org.ow2.asm</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<scope>test</scope>
<groupId>com.baiye</groupId>
<artifactId>cdp-common-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<artifactId>hutool-all</artifactId>
<groupId>cn.hutool</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.baiye</groupId>
<artifactId>cdp-common-security</artifactId>
<exclusions>
<exclusion>
<artifactId>bcpkix-jdk15on</artifactId>
<groupId>org.bouncycastle</groupId>
</exclusion>
<exclusion>
<artifactId>HdrHistogram</artifactId>
<groupId>org.hdrhistogram</groupId>
</exclusion>
<exclusion>
<groupId>org.micah</groupId>
<artifactId>eladmin-api-system</artifactId>
</exclusion>
</exclusions>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
<groupId>com.baiye</groupId>
<artifactId>cdp-tool-netty-disruptor</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alicloud-oss</artifactId>
</dependency>
</dependencies>
<!-- 配置插件 -->
<build>
<plugins>
<!-- springboot 打包插件 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.baiye.WechatApiApplication</mainClass>
<layout>ZIP</layout>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -1,19 +1,26 @@
package com.baiye;
import com.baiye.annotation.EnableCustomizeFeignClient;
import com.baiye.annotation.EnableCustomizeResourceServer;
import com.baiye.listener.MySink;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* @author wjt
* @date 2022/07/14
*/
@EnableScheduling
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(MySink.class)
@MapperScan("com.baiye.dao")
@EnableFeignClients(basePackages = "com.baiye")
@EnableCustomizeResourceServer
@EnableCustomizeFeignClient(basePackages = "com.baiye")
public class WechatApiApplication {
public static void main(String[] args) {
SpringApplication.run(WechatApiApplication.class, args);

@ -0,0 +1,58 @@
package com.baiye.controller;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.aliyun.oss.OSSClient;
import com.baiye.core.base.api.Result;
import com.baiye.core.constant.DefaultNumberConstants;
import com.baiye.exception.global.BadRequestException;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import java.net.URL;
/**
* @author Enzo
* @date : 2022/8/10
*/
@Slf4j
@RestController
@RequestMapping("/wechat/upload")
public class AliYunOssUpload {
@Resource
private OSSClient ossClient;
@Value("${aliyun.bucketname}")
private String bucketname;
@SneakyThrows
@PostMapping("/images")
public Result<String> uploadOssFile(MultipartFile file) {
if (ObjectUtil.isNotNull(file) &&
StringUtils.isNotBlank(file.getOriginalFilename())) {
String objectName = DateUtil.formatDate
(DateUtil.date()).concat(StrUtil.SLASH)
.concat(file.getOriginalFilename());
ossClient.putObject(bucketname, objectName, file.getInputStream());
// 生成以GET方法访问的签名URL访客可以直接通过浏览器访问相关内容。
DateTime dateTime = DateUtil.offsetDay(DateUtil.date(), DefaultNumberConstants.TWO_HUNDRED);
URL url = ossClient.generatePresignedUrl
(bucketname, objectName, dateTime);
ossClient.shutdown();
return Result.data(url.toString());
}
throw new BadRequestException("上传图片失败");
}
}

@ -6,6 +6,7 @@ import com.baiye.core.page.PageResult;
import com.baiye.dto.ResponseMessageRecordDTO;
import com.baiye.query.MessageRecordQuery;
import com.baiye.service.MessageRecordService;
import com.baiye.util.SecurityUtils;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -33,44 +34,42 @@ public class MessageController {
@ApiOperation("查询历史信息")
@GetMapping("/list")
public ResponseEntity<List<ResponseMessageRecordDTO>> query(Pageable pageable) {
public Result<List<ResponseMessageRecordDTO>> query(Pageable pageable) {
// Long currentUserId = SecurityUtils.getCurrentUserId();
return new ResponseEntity<>
(this.messageRecordService.queryAll(pageable, 1L), HttpStatus.OK);
return Result.data((this.messageRecordService.queryAll(pageable, SecurityUtils.getCurrentUserId())));
}
@ApiOperation("已读信息")
@GetMapping("/read")
public ResponseEntity<Result<ResultCode>> readMessage(Long userId) {
public Result<Result<ResultCode>> readMessage(Long userId) {
this.messageRecordService.readMessage(userId);
return new ResponseEntity<>(Result.success(), HttpStatus.OK);
return Result.success();
}
@ApiOperation("未读信息")
@GetMapping("/notRead")
public ResponseEntity<Result<ResultCode>> notReadMessage(Long userId) {
public Result<ResultCode> notReadMessage(Long userId) {
this.messageRecordService.notReadMessage(userId);
return new ResponseEntity<>(Result.success(), HttpStatus.OK);
return Result.success();
}
@ApiOperation("删除信息")
@DeleteMapping("/delete")
public ResponseEntity<PageResult> deleteMessage(Long userId) {
public Result<PageResult> deleteMessage(Long userId) {
this.messageRecordService.deleteMessage(userId);
return new ResponseEntity<>(HttpStatus.OK);
return Result.success();
}
@ApiOperation("消息分页")
@GetMapping("/messagePage")
public ResponseEntity<PageResult> queryMessageByUserId
public Result<PageResult> queryMessageByUserId
(MessageRecordQuery messageRecordQuery, Pageable pageable) {
return new ResponseEntity<>
(this.messageRecordService.queryMessageByUserId(messageRecordQuery,pageable), HttpStatus.OK);
return Result.data((this.messageRecordService.
queryMessageByUserId(messageRecordQuery, pageable)));
}

@ -0,0 +1,50 @@
package com.baiye.controller;
import com.baiye.core.base.api.Result;
import com.baiye.core.page.PageResult;
import com.baiye.query.WeChatPersonQuery;
import com.baiye.service.PersonalWeChatUserService;
import com.baiye.util.SecurityUtils;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Pageable;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author Enzo
* @date : 2022/8/8
*/
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/wechat/addressBook")
public class PersonalWeChatUserController {
private final PersonalWeChatUserService personalWeChatUserService;
@ApiOperation("通讯录分页")
@GetMapping("/page")
public Result<PageResult> query(WeChatPersonQuery weChatWordsQuery, Pageable pageable) {
weChatWordsQuery.setUserId(SecurityUtils.getCurrentUserId());
return Result.data(this.personalWeChatUserService.selectPage(weChatWordsQuery, pageable));
}
@ApiOperation("修改星标")
@GetMapping("/updateStar")
public Result<Void> updateStar(Long userId, Boolean startFlag) {
Long currentUserId = SecurityUtils.getCurrentUserId();
this.personalWeChatUserService.changeUserStart(userId, currentUserId, startFlag);
return Result.success();
}
@ApiOperation("星标分页")
@GetMapping("/startPage")
public Result<PageResult> startPageQuery(WeChatPersonQuery weChatWordsQuery, Pageable pageable) {
weChatWordsQuery.setUserId(SecurityUtils.getCurrentUserId());
weChatWordsQuery.setIsStart(Boolean.TRUE);
return Result.data(this.personalWeChatUserService.selectPage(weChatWordsQuery, pageable));
}
}

@ -1,9 +1,11 @@
package com.baiye.controller;
import com.baiye.core.base.api.Result;
import com.baiye.core.page.PageResult;
import com.baiye.entity.WeChatWords;
import com.baiye.query.WeChatWordsQuery;
import com.baiye.service.WeChatWordsService;
import com.baiye.util.SecurityUtils;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -31,28 +33,33 @@ public class WeChatWordsController {
@ApiOperation("企微分页查询")
@GetMapping("/list")
public ResponseEntity<PageResult> getWeChatUsers(WeChatWordsQuery weChatWordsQuery, Pageable pageable) {
return new ResponseEntity<>(this.weChatWordsService.queryWeChatWords(weChatWordsQuery,pageable), HttpStatus.OK);
public Result<PageResult> getWeChatUsers(WeChatWordsQuery weChatWordsQuery, Pageable pageable) {
weChatWordsQuery.setUserId(SecurityUtils.getCurrentUserId());
return Result.data
(this.weChatWordsService.queryWeChatWords
(weChatWordsQuery, pageable));
}
@ApiOperation("新增话术")
@PostMapping("/create")
public ResponseEntity<Void> create(@Validated @RequestBody WeChatWords resources) {
weChatWordsService.create(resources);
return new ResponseEntity<>(HttpStatus.CREATED);
public Result<Void> create(@Validated @RequestBody WeChatWords resources) {
resources.setUserId(SecurityUtils.getCurrentUserId());
weChatWordsService.create( resources);
return Result.success();
}
@ApiOperation("修改话术")
@PostMapping("/update")
public ResponseEntity<Void> update(@Validated @RequestBody WeChatWords resources) {
public Result<Void> update(@Validated @RequestBody WeChatWords resources) {
resources.setUserId(SecurityUtils.getCurrentUserId());
weChatWordsService.updateWechatWords(resources);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
return Result.success();
}
@ApiOperation("删除话术")
@PostMapping("/delete")
public ResponseEntity<Void> delete(@RequestBody Set<Long> ids) {
public Result<Void> delete(@RequestBody Set<Long> ids) {
weChatWordsService.deleteWechatWordByIds(ids);
return new ResponseEntity<>(HttpStatus.OK);
return Result.success();
}
}

@ -1,6 +1,7 @@
package com.baiye.controller;
import cn.hutool.core.collection.CollUtil;
import com.baiye.annotation.Inner;
import com.baiye.core.annotation.Log;
import com.baiye.core.base.api.Result;
import com.baiye.core.base.api.ResultCode;
@ -10,6 +11,7 @@ import com.baiye.dto.*;
import com.baiye.entity.EnterpriseWeChatUser;
import com.baiye.query.WeChatUserQuery;
import com.baiye.service.WeChatUserService;
import com.baiye.util.SecurityUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
@ -46,6 +48,7 @@ public class WechatController {
return weChatUserService.getWeChatUsersOnline();
}
@Inner
@PostMapping(value = "/login")
public Result<String> frontRcvResponse(@RequestBody WeChatResponse weChatResponse) {
weChatUserService.analyticalData(weChatResponse);
@ -56,7 +59,7 @@ public class WechatController {
@ApiOperation("获取二维码")
@GetMapping("/getQrCode")
public Result<String> getQrCode(String wechatId, String uuid, String appKey, String appSecret) {
return weChatUserService.getQrCode(wechatId, uuid, appKey, appSecret);
return weChatUserService.getQrCode(wechatId, uuid, appKey, appSecret, SecurityUtils.getCurrentUserId());
}
@ -103,8 +106,8 @@ public class WechatController {
@Log("获取微信id")
@ApiOperation("获取微信id")
@GetMapping("/getWxId")
public List<String> getWxId(@RequestParam("userId") Long userId) {
return weChatUserService.getWeChatIds(userId);
public List<String> getWxId() {
return weChatUserService.getWeChatIds(SecurityUtils.getCurrentUserId());
}

@ -0,0 +1,128 @@
package com.baiye.job;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONUtil;
import com.baiye.core.base.api.Result;
import com.baiye.core.constant.DefaultNumberConstants;
import com.baiye.core.constant.HttpStatus;
import com.baiye.core.constant.WeChatRequestConstants;
import com.baiye.dto.SyncWeChatAddressBookDTO;
import com.baiye.dto.WeChatAddressBookDTO;
import com.baiye.entity.EnterpriseWeChatUser;
import com.baiye.entity.PersonalWeChatUser;
import com.baiye.properties.WeChatProperties;
import com.baiye.service.PersonalWeChatUserService;
import com.baiye.service.WeChatUserService;
import com.google.common.collect.Lists;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author Enzo
* @date : 2022/8/8
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class SyncFriendsTask {
private final WeChatProperties weChatProperties;
private final WeChatUserService weChatUserService;
private final PersonalWeChatUserService personalWeChatUserService;
/**
* 8
*/
@Scheduled(cron = "0 0/8 * * * ?")
public void syncContacts() {
long startTime = DateUtil.date().getTime();
log.info("=================== the sync account time as {} ===================", startTime);
List<EnterpriseWeChatUser> byWeChatList =
weChatUserService.findByWeChatList();
if (CollUtil.isNotEmpty(byWeChatList)) {
for (EnterpriseWeChatUser chatUser : byWeChatList) {
Result<Object> objectResult =
weChatUserService.syncContact(chatUser.getWxId());
log.info("=================== the sync account result as {} =================== ",
JSONUtil.toJsonStr(objectResult));
}
}
long endTime = DateUtil.date().getTime();
log.info("=================== the sync account consuming time as {} =================== ", endTime - startTime);
}
/**
* 10
*/
@Scheduled(cron = "0 0/10 * * * ?")
public void syncUser() {
// 查询所有用户
List<EnterpriseWeChatUser> byWeChatList =
weChatUserService.findByWeChatList();
if (CollUtil.isNotEmpty(byWeChatList)) {
for (EnterpriseWeChatUser chatUser : byWeChatList) {
// 查询通讯录
Result<Object> allContact = weChatUserService.getAllContact(chatUser.getWxId());
if (allContact.getCode() == HttpStatus.SUCCESS) {
List<WeChatAddressBookDTO> chatAddressBookDTOList =
Convert.toList(WeChatAddressBookDTO.class, allContact.getData());
if (CollUtil.isNotEmpty(chatAddressBookDTOList)) {
// 查询已插入用户
List<String> strings =
personalWeChatUserService.
selectUserIdListByCurrentUserId(chatUser.getWxId());
// list转集合
Map<String, WeChatAddressBookDTO> collect =
chatAddressBookDTOList.stream().collect
(Collectors.toMap(WeChatAddressBookDTO::getUserName,
account -> account));
// set差集比较
Set<String> querySet = collect.keySet();
Set<String> newHashSet = Sets.newHashSet(strings);
Sets.SetView<String> difference =
Sets.difference(querySet, newHashSet);
log.info("======================= the difference size as {} =======================",difference.size());
for (String weChatUserId : difference) {
WeChatAddressBookDTO dto = collect.get(weChatUserId);
// 时间判断
if (dto.getAddTime() != null &&
dto.getAddTime() > DefaultNumberConstants.ZERO_NUMBER) {
PersonalWeChatUser weChatUser = new PersonalWeChatUser();
weChatUser.setSex(dto.getSex());
weChatUser.setIsStar(Boolean.FALSE);
weChatUser.setNickname(dto.getNickName());
weChatUser.setWeChatId(dto.getUserName());
weChatUser.setUsername(dto.getUserName());
weChatUser.setUserId(chatUser.getOwnerId());
weChatUser.setEnterpriseId(chatUser.getWxId());
weChatUser.setAddTime(DateUtil.date(dto.getAddTime()));
weChatUser.setBigHeadImgUrl(dto.getBigHeadImgUrl());
// 插入数据
personalWeChatUserService.save(weChatUser);
}
}
}
}
}
}
}
}

@ -0,0 +1,14 @@
package com.baiye.listener;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MySink {
String ERBADAGANG_INPUT = "wechat-input";
@Input(ERBADAGANG_INPUT)
SubscribableChannel demo01Input();
}

@ -0,0 +1,60 @@
package com.baiye.listener;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import com.baiye.core.constant.CacheKey;
import com.baiye.core.constant.DefaultNumberConstants;
import com.baiye.core.util.RedisUtils;
import com.baiye.dto.TransmissionMessageDTO;
import com.baiye.entity.EnterpriseWeChatUser;
import com.baiye.service.PersonalWeChatUserService;
import com.baiye.service.SocketServerHandler;
import com.baiye.service.WeChatUserService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @author Enzo
* @date 2022-8-9
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class WeChatMessageConsumer {
private final RedisUtils redisUtils;
private final WeChatUserService weChatUserService;
private final SocketServerHandler socketServerHandler;
@StreamListener(MySink.ERBADAGANG_INPUT)
public void onMessage(Message<?> message) {
TransmissionMessageDTO messageDTO = JSONUtil.toBean
(JSONUtil.toJsonStr(message.getPayload()), TransmissionMessageDTO.class);
if (ObjectUtil.isNotNull(messageDTO)) {
Long recordId = messageDTO.getId();
String wechatId = messageDTO.getWechatId();
// 防止重复消费
Object messageRecordCache =
redisUtils.get(CacheKey.MESSAGE_KEY + recordId);
if (ObjectUtil.isNull(messageRecordCache)) {
EnterpriseWeChatUser byWechat
= weChatUserService.findByWechat(wechatId);
messageDTO.setEnterpriseWeChatNickname(byWechat.getNickname());
messageDTO.setEnterpriseWeChatAvatar(byWechat.getSmallHeadImgUrl());
socketServerHandler.sendMessage
(wechatId, BeanUtil.beanToMap(messageDTO));
redisUtils.set(CacheKey.MESSAGE_KEY + recordId,
recordId, DefaultNumberConstants.NINETY, TimeUnit.MINUTES);
}
}
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}

@ -1,10 +1,32 @@
#配置数据源
spring:
cloud:
nacos:
discovery:
server-addr: ${NACOS_HOST:8.130.96.163}:${NACOS_PORT:8848}
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
wechat-input:
destination: WECHAT-TOPIC # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: wechat-consumer-group-wechat-topic-01 # 消费者分组,命名规则:组名+topic名
consumer:
max-attempts: 2
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 8.130.96.163:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
wechat-input:
# RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
consumer:
enabled: true # 是否开启消费,默认为 true
broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费
orderly: true # 是否顺序消费,默认为 false 并发消费。
redis:
database: 3
host: 8.130.96.163

@ -0,0 +1,90 @@
#配置数据源
spring:
cloud:
nacos:
discovery:
server-addr: ${NACOS_HOST:172.16.69.134}:${NACOS_PORT:8848}
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
wechat-input:
destination: WECHAT-TOPIC # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: wechat-consumer-group-wechat-topic-01 # 消费者分组,命名规则:组名+topic名
consumer:
max-attempts: 2
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 39.103.195.38:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
wechat-input:
# RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
consumer:
enabled: true # 是否开启消费,默认为 true
broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费
orderly: true # 是否顺序消费,默认为 false 并发消费。
redis:
database: 3
host: 8.130.96.163
timeout: 5000
datasource:
druid:
db-type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://rm-bp1693kl5d490o5cn.mysql.rds.aliyuncs.com:3306/db_wechat?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull
username: root
password: yuyou@RDS2020
# 初始连接数
initial-size: 5
# 最小连接数
min-idle: 15
# 最大连接数
max-active: 30
# 超时时间(以秒数为单位)
remove-abandoned-timeout: 180
# 获取连接超时时间
max-wait: 3000
# 连接有效性检测时间
time-between-eviction-runs-millis: 60000
# 连接在池中最小生存的时间
min-evictable-idle-time-millis: 300000
# 连接在池中最大生存的时间
max-evictable-idle-time-millis: 900000
# 指明连接是否被空闲连接回收器(如果有)进行检验.如果检测失败,则连接将被从池中去除
test-while-idle: true
# 指明是否在从池中取出连接前进行检验,如果检验失败, 则从池中去除连接并尝试取出另一个
test-on-borrow: true
# 是否在归还到池中前进行检验
test-on-return: false
# 检测连接是否有效
validation-query: select 1
# 配置监控统计
webStatFilter:
enabled: true
stat-view-servlet:
enabled: true
url-pattern: /druid/*
reset-enable: false
filter:
stat:
enabled: true
# 记录慢SQL
log-slow-sql: true
slow-sql-millis: 1000
merge-sql: false
wall:
config:
multi-statement-allow: true
#是否开启 swagger-ui
swagger:
enabled: true
# IP 本地解析
ip:
local-parsing: true

@ -1,11 +1,13 @@
server:
port: 8085
websocket:
port: 8099
url: websocket
spring:
main:
allow-bean-definition-overriding: true
application:
name: @artifactId@
name: cdp-wechat-api
freemarker:
check-template-location: false
profiles:
@ -20,12 +22,45 @@ spring:
multipart:
max-request-size: 200MB
max-file-size: 200MB
cloud:
#新版写法
alicloud:
access-key: R7hb84lBwjJQasch
secret-key: 2ngpjkzXBKHPZhVj2yDZSGVX3fXAwT
oss:
endpoint: https://oss-cn-hangzhou.aliyuncs.com
bucket: baiye-cdp
mybatis-plus:
mapper-locations: classpath:mapper/*Mapper.xml
security:
oauth2:
client:
clientId: admin-web
clientSecret: 123456
userAuthorizationUri: http://auth-server:7070/oauth/authorize
grant-type: password
scope: server
access-token-uri: http://auth-server:7070/oauth/token
resource:
token-info-uri: http://auth-server:7070/oauth/check_token
ignore:
urls:
- /v2/api-docs
- /actuator/**
- /user/info/*
- /operlog
- /logininfor
- /oauth/check_token
snowflake:
workerId: 9
datacenterId: 9
aliyun:
bucketname: baiye-cdp
wechat:
gatewayHost: bydw.wework.uat.robot.ecofanli.com
gatewayHost: byqw.wework.uat.robot.ecofanli.com

@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2020 pig4cloud Authors. All Rights Reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration debug="false" scan="false">
<springProperty scop="context" name="spring.application.name" source="spring.application.name" defaultValue=""/>
<property name="log.path" value="logs/${spring.application.name}"/>
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN"
value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>
<conversionRule conversionWord="wex"
converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/>
<conversionRule conversionWord="wEx"
converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
<!-- Console log output -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
</encoder>
</appender>
<!-- Log file debug output -->
<appender name="debug" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/debug.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.path}/%d{yyyy-MM, aux}/debug.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>50MB</maxFileSize>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%date [%thread] %-5level [%logger{50}] %file:%line - %msg%n</pattern>
</encoder>
</appender>
<!-- Log file error output -->
<appender name="error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/error.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.path}/%d{yyyy-MM}/error.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>50MB</maxFileSize>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%date [%thread] %-5level [%logger{50}] %file:%line - %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
</filter>
</appender>
<!--nacos 心跳 INFO 屏蔽-->
<logger name="com.alibaba.nacos" level="OFF">
<appender-ref ref="error"/>
</logger>
<!-- Level: FATAL 0 ERROR 3 WARN 4 INFO 6 DEBUG 7 -->
<root level="INFO">
<appender-ref ref="console"/>
<appender-ref ref="debug"/>
<appender-ref ref="error"/>
</root>
</configuration>

@ -27,17 +27,7 @@ public class MessageRecordTest {
@Test
public void saveRecord() {
for (int i = 0; i < 20; i++) {
MessageRecord record = new MessageRecord();
int i1 = RandomUtil.randomInt(2);
record.setStatus(1);
record.setIsRead(i1);
record.setMsg("我是老" + i);
record.setMsgFlag(i1 > 1);
record.setMsgType(1);
record.setUserId(2L);
record.setWechatId("1688855725361602");
record.setUserWechatId("1688850251907471");
messageRecordService.save(record);
}

@ -0,0 +1,58 @@
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.OSSClientBuilder;
import com.baiye.WechatApiApplication;
import com.baiye.properties.AliYunOssProperties;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.URL;
/**
* @author Enzo
* @date : 2022/8/10
*/
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = WechatApiApplication.class,
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class UploadTest {
@Resource
private OSSClient ossClient;
@Test
public void upload(){
// 上传文件流。
InputStream inputStream = null;
try {
inputStream = new FileInputStream("");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
String objectName = DateUtil.formatDate
(DateUtil.date()).concat("/")
.concat("test.png");
ossClient.putObject("baiye-cdp", objectName, inputStream);
// 生成以GET方法访问的签名URL访客可以直接通过浏览器访问相关内容。
DateTime dateTime = DateUtil.offsetDay(DateUtil.date(), 180);
URL url = ossClient.generatePresignedUrl
("baiye-cdp", objectName, dateTime);
ossClient.shutdown();
url.toString();
}
}

@ -16,7 +16,6 @@
<dependencies>
<dependency>
<groupId>com.baiye</groupId>
<artifactId>cdp-common-core</artifactId>

@ -0,0 +1,25 @@
package com.baiye.convert;
import cn.hutool.extra.emoji.EmojiUtil;
import javax.persistence.AttributeConverter;
import javax.persistence.Converter;
/**
* @author Enzo
* @date : 2021/12/28
*/
@Converter
public class EmojiConverterListJson implements AttributeConverter<Object, String> {
@Override
public String convertToDatabaseColumn(Object obj) {
return EmojiUtil.toAlias(obj.toString());
}
@Override
public Object convertToEntityAttribute(String s) {
return EmojiUtil.toUnicode(s);
}
}

@ -1,5 +1,6 @@
package com.baiye.dto;
import com.baomidou.mybatisplus.annotation.TableField;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@ -23,17 +24,20 @@ public class PersonalWeChatUserDTO {
private String bigHeadImgUrl;
@ApiModelProperty(value = "昵称")
private String nickName;
private String nickname;
@ApiModelProperty(value = "名称")
private String userName;
private String username;
@ApiModelProperty(value = "性别")
private Integer sex;
@ApiModelProperty(value = "用户id")
private Long userId;
@ApiModelProperty(value = "企业微信id")
private String enterpriseId;
@ApiModelProperty(value = "添加时间")
private Date addTime;
@ApiModelProperty(value = "是否星标")
private Boolean isStar;
}

@ -18,6 +18,8 @@ public class ResponseMessageRecordDTO {
@ApiModelProperty("未读数量")
private Integer notReadNum;
@ApiModelProperty("是否星标")
private Boolean isStart;
@ApiModelProperty("是否有未读")
private Boolean isRead;

@ -0,0 +1,16 @@
package com.baiye.dto;
import lombok.Data;
import java.util.List;
/**
* @author Enzo
* @date : 2022/8/8
*/
@Data
public class SyncWeChatAddressBookDTO {
private List<WeChatAddressBookDTO> data;
}

@ -0,0 +1,64 @@
package com.baiye.dto;
import io.swagger.annotations.ApiModelProperty;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* @author Enzo
* @date : 2022/8/11
*/
@Data
@Builder
public class TransmissionMessageDTO implements Serializable {
private static final long serialVersionUID = -6341616711428582529L;
@ApiModelProperty(value = "ID")
private Long id;
private String wechatId;
@ApiModelProperty(value = "用户微信id")
private String userWechatId;
@ApiModelProperty(value = "微信属于哪个用户id")
private Long userId;
@ApiModelProperty(value = "mq消息")
private String msg;
@ApiModelProperty(value = "消息状态")
private Integer status;
@ApiModelProperty(value = "消息类型")
private Integer msgType;
@ApiModelProperty(value = "是否读取0未读 1已读")
private Integer isRead;
@ApiModelProperty(value = "创建时间")
private Date createTime;
@ApiModelProperty(value = "true发送false接收")
private Boolean msgFlag;
@ApiModelProperty(value = "企业微信昵称")
private String enterpriseWeChatNickname;
@ApiModelProperty(value = "企业微信头像")
private String enterpriseWeChatAvatar;
@ApiModelProperty(value = "个人微信昵称")
private String personalWeChatNickname;
@ApiModelProperty(value = "个人微信头像")
private String personalWeChatAvatar;
}

@ -0,0 +1,27 @@
package com.baiye.dto;
import lombok.Data;
/**
* @author Enzo
* @date : 2022/8/8
*/
@Data
public class WeChatAddressBookDTO {
private Integer sex;
private Integer type;
private Long addTime;
private Integer msgType;
private String userName;
private String nickName;
private String companyId;
private Integer verifyFlag;
private Integer contactType;
private Integer maybeSource1;
private Integer maybeSource2;
private String bigHeadImgUrl;
private Integer chatroomVersion;
}

@ -14,17 +14,17 @@ public class WeChatSendTextDTO {
/**
* ID
*/
@NonNull
private String wechat;
/**
* (encoder)
*/
@NonNull
private String content;
/**
* id,@chatroom
*/
@NonNull
private String tagetWxId;
/**
* @id ,

@ -0,0 +1,28 @@
package com.baiye.dto;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* @author Enzo
* @date : 2022/8/12
*/
@Data
public class WebsocketMessageRequestDTO {
@ApiModelProperty(value = "用户微信id")
private String userWechatId;
@ApiModelProperty(value = "微信用户id")
private Long userId;
@ApiModelProperty(value = "企业微信")
private String enterpriseId;
@ApiModelProperty(value = "消息主要内容")
private String content;
@ApiModelProperty(value = "消息类型")
private Integer messageType;
}

@ -1,11 +1,14 @@
package com.baiye.entity;
import com.baiye.core.base.BaseEntity;
import com.baiye.core.constant.DefaultNumberConstants;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import javax.persistence.Column;
import java.io.Serializable;
@ -15,13 +18,14 @@ import java.util.Date;
* @author wjt
* @date 2022/07/14
*/
@Data
@Getter
@Setter
@TableName("tb_enterprise_wechat_user")
public class EnterpriseWeChatUser extends BaseEntity implements Serializable {
private static final long serialVersionUID = -4755997517585953316L;
@TableId(value = "id")
@TableId(value = "id", type = IdType.AUTO)
@ApiModelProperty(value = "ID", hidden = true)
private Long id;
@ -64,4 +68,11 @@ public class EnterpriseWeChatUser extends BaseEntity implements Serializable {
@Column(name = "owner_name")
@ApiModelProperty(value = "维护人")
private String ownerName;
@Column(name = "account_status")
@ApiModelProperty(value = "账号状态")
private Integer accountStatus = DefaultNumberConstants.ONE_NUMBER;
}

@ -1,9 +1,12 @@
package com.baiye.entity;
import com.baiye.convert.EmojiConverterListJson;
import com.baomidou.mybatisplus.annotation.*;
import io.swagger.annotations.ApiModelProperty;
import lombok.Builder;
import lombok.Data;
import javax.persistence.Convert;
import java.io.Serializable;
import java.util.Date;
@ -12,10 +15,11 @@ import java.util.Date;
* @date 2022-8-1 19:07:36
*/
@Data
@Builder
@TableName("tb_message_record")
public class MessageRecord implements Serializable {
private static final long serialVersionUID = -7281246000750520229L;
private static final long serialVersionUID = -7281246000750520239L;
/**
* AUTO(0), //自增
@ -34,10 +38,11 @@ public class MessageRecord implements Serializable {
@ApiModelProperty(value = "用户微信id")
private String userWechatId;
@ApiModelProperty(value = "微信用户id")
@ApiModelProperty(value = "微信属于哪个用户id")
private Long userId;
@ApiModelProperty(value = "mq消息")
@ApiModelProperty(value = "消息内容")
@Convert(converter = EmojiConverterListJson.class)
private String msg;
@ApiModelProperty(value = "消息状态")
@ -50,7 +55,7 @@ public class MessageRecord implements Serializable {
private Integer isRead = 0;
@TableField(fill = FieldFill.INSERT)
@ApiModelProperty(value = "mq消息")
@ApiModelProperty(value = "创建时间")
private Date createTime;
@TableField(fill = FieldFill.INSERT_UPDATE)

@ -4,7 +4,6 @@ import com.baomidou.mybatisplus.annotation.*;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.Date;
@ -18,11 +17,10 @@ import java.util.Date;
public class PersonalWeChatUser {
@TableId(value = "id", type = IdType.INPUT)
@TableId(value = "id", type = IdType.AUTO)
@ApiModelProperty(value = "ID")
private Long id;
@ApiModelProperty(value = "微信id")
@TableField("we_chat_id")
private String weChatId;
@ -33,11 +31,11 @@ public class PersonalWeChatUser {
@ApiModelProperty(value = "昵称")
@TableField("nickname")
private String nickName;
private String nickname;
@ApiModelProperty(value = "名称")
@TableField("user_name")
private String userName;
@TableField("username")
private String username;
@ApiModelProperty(value = "性别")
@TableField("sex")
@ -45,12 +43,21 @@ public class PersonalWeChatUser {
@ApiModelProperty(value = "用户id")
@TableField("user_id")
private Long userId;
private Long userId;
@ApiModelProperty(value = "企业微信id")
@TableField("enterprise_id")
private String enterpriseId;
@ApiModelProperty(value = "是否星标")
@TableField("is_star")
private Boolean isStar = Boolean.FALSE;
/**
*
*/
@TableField(value = "create_time",fill = FieldFill.INSERT )
@TableField(value = "create_time", fill = FieldFill.INSERT)
@ApiModelProperty(value = "创建时间")
private Date createTime;
@ -59,7 +66,7 @@ public class PersonalWeChatUser {
@ApiModelProperty(value = "添加时间")
private Date addTime;
@TableField(value = "update_time",fill = FieldFill.INSERT_UPDATE)
@TableField(value = "update_time", fill = FieldFill.INSERT_UPDATE)
@ApiModelProperty(value = "添加时间")
private Date updateTime;
}

@ -44,6 +44,10 @@ public class QrCodeRecord implements Serializable {
@ApiModelProperty(value = "微信id")
private String wechatId;
@Column(name = "user_id")
@ApiModelProperty(value = "管理员id")
private Long userId;
@Column(name = "callback_url")
@ApiModelProperty(value = "回调接口")
private String callbackUrl;

@ -39,7 +39,7 @@ public class WechatMessageContent implements Serializable {
/**
* idid
*/
private Long toUserName;
private String toUserName;
/**
* 0- 1-
@ -49,6 +49,6 @@ public class WechatMessageContent implements Serializable {
/**
* id
*/
private Long wechat;
private String wechat;
}

@ -0,0 +1,46 @@
package com.baiye.query;
import com.baiye.annotation.Query;
import com.baiye.annotation.type.SelectType;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.sql.Timestamp;
import java.util.List;
/**
* @author
* @enzo
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WeChatPersonQuery {
/**
*
*/
@Query
private Long userId;
/**
*
*/
@Query(type = SelectType.INNER_LIKE)
private String nickname;
/**
*
*/
@Query(type = SelectType.BETWEEN)
private List<Timestamp> createTime;
/**
*
*/
@Query
private Boolean isStart;
}

@ -48,7 +48,6 @@
<artifactId>jasypt-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.baiye</groupId>
<artifactId>cdp-wechat-core</artifactId>
@ -57,7 +56,5 @@
</dependencies>
</project>

@ -19,4 +19,12 @@ public interface PersonalWeChatUserMapper extends BaseMapper<PersonalWeChatUser>
* @return
*/
List<Long> selectUserIdListByCurrentUserId(Long currentUserId);
/**
* id
* @param userWechatId
* @return
*/
List<String> selectUserIdListByWeChatId(String userWechatId);
}

@ -0,0 +1,28 @@
package com.baiye.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author Enzo
* @date : 2022/8/10
*/
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "aliyun.oss.file")
public class AliYunOssProperties {
private String bucketName;
private String endpoint;
private String accessKeyId;
private String accessKeySecret;
}

@ -2,6 +2,7 @@ package com.baiye.service;
import com.baiye.core.page.PageResult;
import com.baiye.dto.ResponseMessageRecordDTO;
import com.baiye.dto.WebsocketMessageRequestDTO;
import com.baiye.entity.MessageRecord;
import com.baiye.query.MessageRecordQuery;
import com.baomidou.mybatisplus.extension.service.IService;
@ -50,4 +51,11 @@ public interface MessageRecordService extends IService<MessageRecord> {
* @param userId
*/
void notReadMessage(Long userId);
/**
*
* @param websocketMessageRequestDTO dto
* @return
*/
Boolean saveMessageByUserId(WebsocketMessageRequestDTO websocketMessageRequestDTO);
}

@ -1,8 +1,11 @@
package com.baiye.service;
import com.baiye.core.page.PageResult;
import com.baiye.dto.PersonalWeChatUserDTO;
import com.baiye.entity.PersonalWeChatUser;
import com.baiye.query.WeChatPersonQuery;
import com.baomidou.mybatisplus.extension.service.IService;
import org.springframework.data.domain.Pageable;
import java.util.List;
@ -19,6 +22,14 @@ public interface PersonalWeChatUserService extends IService<PersonalWeChatUser>
*/
List<Long> selectUserIdListByCurrentUserId(Long currentUserId);
/**
* idid
* @param userWechatId
* @return
*/
List<String> selectUserIdListByCurrentUserId(String userWechatId);
/**
* id
* @param userWechatId
@ -32,4 +43,31 @@ public interface PersonalWeChatUserService extends IService<PersonalWeChatUser>
* @return
*/
PersonalWeChatUserDTO queryByUserId(Long userId);
/**
* user
* @param weChatWordsQuery
* @param pageable
* @return
*/
PageResult selectPage(WeChatPersonQuery weChatWordsQuery, Pageable pageable);
/**
*
* @param enterpriseWechat
* @param userWechat
* @return
*/
PersonalWeChatUser findByWechat(String enterpriseWechat,String userWechat);
/**
*
*
* @param userId
* @param currentUserId
* @param startFlag
*/
void changeUserStart(Long userId, Long currentUserId, Boolean startFlag);
}

@ -6,7 +6,6 @@ import com.baiye.core.constant.WeChatResponse;
import com.baiye.core.page.PageResult;
import com.baiye.dto.*;
import com.baiye.entity.EnterpriseWeChatUser;
import com.baiye.entity.MessageRecord;
import com.baiye.query.WeChatUserQuery;
import com.baomidou.mybatisplus.extension.service.IService;
import org.springframework.data.domain.Pageable;
@ -42,9 +41,10 @@ public interface WeChatUserService extends IService<EnterpriseWeChatUser> {
* @param uuid
* @param appKey
* @param appSecret
* @param currentUserId
* @return
*/
Result<String> getQrCode(String wechatId, String uuid, String appKey, String appSecret);
Result<String> getQrCode(String wechatId, String uuid, String appKey, String appSecret, Long currentUserId);
/**
*
@ -137,12 +137,24 @@ public interface WeChatUserService extends IService<EnterpriseWeChatUser> {
*/
Result<Object> sendGif(WeChatSendGifDTO weChatSendGifDTO);
/**
* id
* @param currentUserId
* @return
*/
List<String> getWeChatIds(Long currentUserId);
/**
* 线
* @param currentUserId
* @return
*/
List<String> selectWechatIdList(Long currentUserId);
List<EnterpriseWeChatUser> findByWeChatList();
/**
* id
* @param wechat
* @return
*/
EnterpriseWeChatUser findByWechat(String wechat);
}

@ -15,6 +15,7 @@ import java.util.Set;
public interface WeChatWordsService extends IService<WeChatWords> {
/**
*
*
* @param resources
*/
void create(WeChatWords resources);

@ -1,28 +1,31 @@
package com.baiye.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjectUtil;
import com.baiye.core.constant.DefaultNumberConstants;
import com.baiye.core.constant.EncryptionConstants;
import com.baiye.core.page.PageResult;
import com.baiye.core.util.AesEncrypt;
import com.baiye.dao.MessageRecordMapper;
import com.baiye.dto.MessageRecordDTO;
import com.baiye.dto.PersonalWeChatUserDTO;
import com.baiye.dto.ResponseMessageRecordDTO;
import com.baiye.dto.WebsocketMessageRequestDTO;
import com.baiye.entity.MessageRecord;
import com.baiye.entity.SourceUploadRecord;
import com.baiye.mapstruct.MessageRecordMapStruct;
import com.baiye.query.MessageRecordQuery;
import com.baiye.service.MessageRecordService;
import com.baiye.service.PersonalWeChatUserService;
import com.baiye.util.PageUtils;
import com.baiye.util.QueryHelpUtils;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
@ -41,10 +44,16 @@ public class MessageRecordServiceImpl extends
private final MessageRecordMapStruct recordMapStruct;
private final MessageRecordMapper messageRecordMapper;
private final PersonalWeChatUserService personalWeChatUserService;
@Value("${snowflake.workerId}")
private int workerId;
@Value("${snowflake.datacenterId}")
private int datacenterId;
@Override
public List<ResponseMessageRecordDTO> queryAll(Pageable pageable, Long currentUserId) {
@ -76,7 +85,8 @@ public class MessageRecordServiceImpl extends
// 查询用户
PersonalWeChatUserDTO user =
personalWeChatUserService.queryByUserId(userId);
user.setEnterpriseId(AesEncrypt.encrypt
(user.getEnterpriseId(),EncryptionConstants.ENCRYPTION_KEY));
List<MessageRecordDTO> dtoList =
recordMapStruct.toDto(list);
// 获取最后一条数据
@ -87,6 +97,7 @@ public class MessageRecordServiceImpl extends
}
recordDTO.setIsRead
(count > DefaultNumberConstants.ZERO_NUMBER);
recordDTO.setIsStart(user.getIsStar());
recordDTO.setWeChatUser(user);
recordDTO.setNotReadNum(count);
recordDTO.setMessageRecordList(dtoList);
@ -95,9 +106,9 @@ public class MessageRecordServiceImpl extends
}
}
// 排序 先以是否含有未读倒序,再以时间降序
// 排序 先以是否星标 -> 是否含有未读倒序 -> 再以时间降序
messageRecord.sort
(comparing(ResponseMessageRecordDTO::getIsRead)
(comparing(ResponseMessageRecordDTO::getIsStart).thenComparing(ResponseMessageRecordDTO::getIsRead)
.thenComparing(ResponseMessageRecordDTO::getLastMessageTime).reversed());
return messageRecord;
@ -119,11 +130,12 @@ public class MessageRecordServiceImpl extends
@Override
public PageResult queryMessageByUserId(MessageRecordQuery messageRecordQuery, Pageable pageable) {
Page<MessageRecord> page = PageUtils.startPageAndSort(pageable);
QueryWrapper<MessageRecord> queryWrapper =
QueryHelpUtils.getWrapper(messageRecordQuery, MessageRecord.class);
Wrapper<MessageRecord> queryWrapper =
new LambdaQueryWrapper<MessageRecord>().eq
(MessageRecord::getUserId, messageRecordQuery.getUserId());
Page<MessageRecord> selectPage = this.page(page, queryWrapper);
return PageResult.success
(selectPage.getTotal(), selectPage.getPages(), selectPage.getRecords());
(selectPage.getTotal(), selectPage.getPages(), this.recordMapStruct.toDto(selectPage.getRecords()));
}
@ -132,4 +144,19 @@ public class MessageRecordServiceImpl extends
this.baseMapper.updateNotReadMessageByUserId
(DefaultNumberConstants.ZERO_NUMBER, DefaultNumberConstants.ONE_NUMBER, userId);
}
@Override
public Boolean saveMessageByUserId(WebsocketMessageRequestDTO websocketMessageRequestDTO) {
Long messageRecordId = IdUtil.getSnowflake(workerId, datacenterId).nextId();
MessageRecord build = MessageRecord.builder().id(messageRecordId).msgFlag(Boolean.TRUE).
wechatId(websocketMessageRequestDTO.getEnterpriseId()).
userWechatId(websocketMessageRequestDTO.getUserWechatId()).
msg(websocketMessageRequestDTO.getContent()).
userId(1L).
status(DefaultNumberConstants.ONE_NUMBER).
msgType(websocketMessageRequestDTO.getMessageType()).
isRead(DefaultNumberConstants.ONE_NUMBER).build();
return this.save(build);
}
}

@ -1,14 +1,26 @@
package com.baiye.service.impl;
import com.baiye.core.constant.EncryptionConstants;
import com.baiye.core.page.PageResult;
import com.baiye.core.util.AesEncrypt;
import com.baiye.dao.PersonalWeChatUserMapper;
import com.baiye.dto.PersonalWeChatUserDTO;
import com.baiye.entity.PersonalWeChatUser;
import com.baiye.mapstruct.PersonalWeChatUserStruct;
import com.baiye.query.WeChatPersonQuery;
import com.baiye.service.PersonalWeChatUserService;
import com.baiye.util.PageUtils;
import com.baiye.util.QueryHelpUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import java.util.List;
@ -31,6 +43,11 @@ public class PersonalWeChatUserServiceImpl
return personalWeChatUserMapper.selectUserIdListByCurrentUserId(currentUserId);
}
@Override
public List<String> selectUserIdListByCurrentUserId(String userWechatId) {
return personalWeChatUserMapper.selectUserIdListByWeChatId(userWechatId);
}
@Override
public List<PersonalWeChatUser> queryByWechatId(String userWechatId) {
return personalWeChatUserMapper.
@ -38,7 +55,38 @@ public class PersonalWeChatUserServiceImpl
}
@Override
@Cacheable(cacheNames = "personalWeChatUserCacheByUserId",key = "'personal:' + #userId")
public PersonalWeChatUserDTO queryByUserId(Long userId) {
return personalWeChatUserStruct.toDto(this.personalWeChatUserMapper.selectById(userId));
}
@Override
public PageResult selectPage(WeChatPersonQuery weChatWordsQuery, Pageable pageable) {
Page<PersonalWeChatUser> page = PageUtils.startPageAndSort(pageable);
QueryWrapper<PersonalWeChatUser> queryWrapper =
QueryHelpUtils.getWrapper(weChatWordsQuery, PersonalWeChatUser.class);
Page<PersonalWeChatUser> personalWeChatUserPage = this.page(page, queryWrapper);
personalWeChatUserPage.getRecords().forEach
(user -> user.setEnterpriseId(AesEncrypt.encrypt
(user.getEnterpriseId(), EncryptionConstants.ENCRYPTION_KEY)));
return PageResult.success(personalWeChatUserPage.getTotal(),
personalWeChatUserPage.getPages(), personalWeChatUserStruct.toDto(personalWeChatUserPage.getRecords()));
}
@Override
@Cacheable(cacheNames = "personalWeChatUserCache",key = "'personal:' + #enterpriseWechat + #userWechat")
public PersonalWeChatUser findByWechat(String enterpriseWechat, String userWechat) {
return this.getOne(new LambdaQueryWrapper<PersonalWeChatUser>().eq
(PersonalWeChatUser::getEnterpriseId, enterpriseWechat).
eq(PersonalWeChatUser::getWeChatId, userWechat));
}
@Override
@CacheEvict(cacheNames = "personalWeChatUserCacheByUserId",key = "'personal:' + #currentUserId")
public void changeUserStart(Long userId, Long currentUserId, Boolean startFlag) {
this.update(new LambdaUpdateWrapper<PersonalWeChatUser>().
eq(PersonalWeChatUser::getUserId, userId).set(PersonalWeChatUser::getIsStar, startFlag));
}
}

@ -4,7 +4,6 @@ import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.http.HttpStatus;
import cn.hutool.http.HttpUtil;
@ -12,9 +11,12 @@ import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.baiye.core.base.api.Result;
import com.baiye.core.base.api.ResultCode;
import com.baiye.core.constant.DefaultNumberConstants;
import com.baiye.core.constant.EncryptionConstants;
import com.baiye.core.constant.WeChatRequestConstants;
import com.baiye.core.constant.WeChatResponse;
import com.baiye.core.page.PageResult;
import com.baiye.core.util.AesEncrypt;
import com.baiye.core.util.DesUtil;
import com.baiye.dao.*;
import com.baiye.dto.*;
@ -25,6 +27,7 @@ import com.baiye.query.WeChatUserQuery;
import com.baiye.service.WeChatUserService;
import com.baiye.util.PageUtils;
import com.baiye.util.QueryHelpUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@ -35,6 +38,7 @@ import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
@ -99,7 +103,7 @@ public class WeChatUserServiceImpl extends
}
@Override
public Result<String> getQrCode(String wechatId, String uuid, String appKey, String appSecret) {
public Result<String> getQrCode(String wechatId, String uuid, String appKey, String appSecret, Long currentUserId) {
if (StringUtils.isBlank(wechatId) && StringUtils.isBlank(uuid)) {
throw new BadRequestException(ResultCode.PARAMETER_ERROR.getMsg());
}
@ -107,6 +111,7 @@ public class WeChatUserServiceImpl extends
Map<String, Object> getQrCodeMap = Maps.newConcurrentMap();
QrCodeRecord qrCodeRecord = new QrCodeRecord();
qrCodeRecord.setUuid(uuid);
qrCodeRecord.setUserId(currentUserId);
getQrCodeMap.put("uuid", uuid);
if (StringUtils.isNotBlank(wechatId)) {
getQrCodeMap.put("wechat", wechatId);
@ -168,26 +173,23 @@ public class WeChatUserServiceImpl extends
userLoginDTO.setCusQrUuId(cusQrUuId);
}
// 插入用户
if (weChatResponse.getStatus() == HttpStatus.HTTP_CREATED) {
EnterpriseWeChatUser byUserId = enterpriseWeChatUserMapper.findByWxId(userLoginDTO.getWxid());
saveOrUpdateWeChatUser(userLoginDTO, byUserId);
}
// 插入日志
saveResponseLog(weChatResponse, cusQrUuId);
List<QrCodeRecord> byUuid = qrCodeRecordMapper.findByUuid(cusQrUuId);
if (CollUtil.isNotEmpty(byUuid) && StringUtils.isNotBlank(byUuid.get(0).getAppKey())) {
String appKey = byUuid.get(0).getAppKey();
if (StringUtils.isNotBlank(appKey)) {
AppUser byAppKey = appUserMapper.findByAppKey(appKey.trim());
map.put("data", userLoginDTO);
map.put("msg", weChatResponse.getMsg());
map.put("status", weChatResponse.getStatus());
// 发送请求
HttpUtil.post(byAppKey.getCallbackUrl(), JSONUtil.toJsonStr(map));
if (CollUtil.isNotEmpty(byUuid) &&
ObjectUtil.isNotNull(byUuid.get(DefaultNumberConstants.ZERO_NUMBER))) {
Long userId = byUuid.get(DefaultNumberConstants.ZERO_NUMBER).getUserId();
if (ObjectUtil.isNotNull(userId)) {
// 插入用户
if (weChatResponse.getStatus() == HttpStatus.HTTP_CREATED) {
EnterpriseWeChatUser byUserId = enterpriseWeChatUserMapper.findByWxId(userLoginDTO.getWxid());
saveOrUpdateWeChatUser(userLoginDTO, byUserId, userId);
}
log.info("============= the send callback success uuid as {} date as {} ============", cusQrUuId, DateUtil.date());
}
}
// 插入日志
saveResponseLog(weChatResponse, cusQrUuId);
}
}
@ -269,18 +271,20 @@ public class WeChatUserServiceImpl extends
weChatCallbackLogMapper.insert(callbackLog);
}
private void saveOrUpdateWeChatUser(WeChatUserLoginDTO userLoginDTO, EnterpriseWeChatUser byUserId) {
private void saveOrUpdateWeChatUser(WeChatUserLoginDTO userLoginDTO, EnterpriseWeChatUser byUserId, Long userId) {
if (ObjectUtil.isNull(byUserId)) {
byUserId = new EnterpriseWeChatUser();
}
if (userLoginDTO instanceof WeChatUserLoginSuccessDTO) {
WeChatUserLoginSuccessDTO successDTO = (WeChatUserLoginSuccessDTO) userLoginDTO;
byUserId.setOwnerId(userId);
byUserId.setLoginDate(DateUtil.date());
byUserId.setRandomId(successDTO.getRandomid());
byUserId.setToken(successDTO.getToken());
byUserId.setWxId(successDTO.getWxid());
byUserId.setLoginType(successDTO.getLoginType());
byUserId.setToken(successDTO.getToken());
byUserId.setNickname(successDTO.getNickname());
byUserId.setRandomId(successDTO.getRandomid());
byUserId.setLoginType(successDTO.getLoginType());
byUserId.setAccountStatus(DefaultNumberConstants.ONE_NUMBER);
byUserId.setSmallHeadImgUrl(successDTO.getSmallHeadImgUrl());
enterpriseWeChatUserMapper.insert(byUserId);
}
@ -360,16 +364,30 @@ public class WeChatUserServiceImpl extends
@Override
public List<String> getWeChatIds(Long currentUserId) {
List<String> wechatIdList = enterpriseWeChatUserMapper.selectWechatIdList(currentUserId);
List<String> wechatIdList =
enterpriseWeChatUserMapper.selectWechatIdList(currentUserId);
List<String> list = Lists.newArrayList();
wechatIdList.forEach(wechatId ->{
list.add(IdUtil.fastUUID());
// TODO 转让需修改代码
wechatIdList.forEach(wechatId -> {
if (StringUtils.isNotBlank(wechatId)) {
list.add(AesEncrypt.encrypt
(wechatId, EncryptionConstants.ENCRYPTION_KEY));
}
});
return list;
}
@Override
public List<EnterpriseWeChatUser> findByWeChatList() {
return this.list(new LambdaQueryWrapper<EnterpriseWeChatUser>().eq
(EnterpriseWeChatUser::getAccountStatus, DefaultNumberConstants.ONE_NUMBER));
}
@Override
public List<String> selectWechatIdList(Long currentUserId) {
return null;
@Cacheable(cacheNames = "enterpriseWeChatUserCache", key = "'wechatId:' + #wechat")
public EnterpriseWeChatUser findByWechat(String wechat) {
return this.getOne(new LambdaQueryWrapper<EnterpriseWeChatUser>().eq(EnterpriseWeChatUser::getWxId, wechat));
}
}

@ -33,8 +33,8 @@ public class WeChatWordsServiceImpl extends ServiceImpl<WeChatWordsMapper, WeCha
@Override
@Transactional(rollbackFor = Exception.class)
public void create(WeChatWords resources) {
// resources.setUserId(SecurityUtils.getCurrentUserId());
public void create( WeChatWords resources) {
this.save(resources);
}
@ -57,7 +57,6 @@ public class WeChatWordsServiceImpl extends ServiceImpl<WeChatWordsMapper, WeCha
@Override
public PageResult queryWeChatWords(WeChatWordsQuery weChatWordsQuery, Pageable pageable) {
weChatWordsQuery.setUserId(1L);
Page<WeChatWords> page = PageUtils.startPageAndSort(pageable);
QueryWrapper<WeChatWords> queryWrapper = QueryHelpUtils.getWrapper(weChatWordsQuery, WeChatWords.class);
Page<WeChatWords> weChatWordsPage = this.page(page, queryWrapper);

@ -21,6 +21,7 @@
</select>
<select id="selectWechatIdList" resultType="string">
select wx_id from tb_enterprise_wechat_user where owner_id = #{currentUserId}
SELECT enterprise_id FROM tb_personal_wechat_user WHERE
user_id = #{currentUserId,jdbcType=BIGINT} GROUP BY enterprise_id
</select>
</mapper>

@ -8,4 +8,8 @@
<select id="selectUserIdListByCurrentUserId" resultType="java.lang.Long">
select id from tb_personal_wechat_user where user_id = #{currentUserId,jdbcType=BIGINT}
</select>
<select id="selectUserIdListByWeChatId" resultType="java.lang.String">
select we_chat_id from tb_personal_wechat_user where enterprise_id = #{userWechatId,jdbcType=VARCHAR}
</select>
</mapper>

@ -10,9 +10,9 @@
<select id="findByUuid" resultType="com.baiye.entity.QrCodeRecord">
select * from tb_qr_code_record
<where>
<if test="uuid!=null ">
and uuid = #{uuid}
<if test="uuid !=null ">
and uuid = #{uuid} or wechat_id = #{uuid} order by id desc
</if>
</where>
</select>
</mapper>
</mapper>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 332 KiB

After

Width:  |  Height:  |  Size: 0 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 55 KiB

After

Width:  |  Height:  |  Size: 0 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 194 KiB

After

Width:  |  Height:  |  Size: 0 B

@ -57,6 +57,7 @@
<spring-boot.version>2.3.2.RELEASE</spring-boot.version>
<net.sf.supercsv.version>2.4.0</net.sf.supercsv.version>
<mysql-connector.version>5.1.47</mysql-connector.version>
<alicloud-oss.version>2.2.0.RELEASE</alicloud-oss.version>
<org.mapstruct.version>1.3.1.Final</org.mapstruct.version>
<commons-beanutils.version>1.9.4</commons-beanutils.version>
<springside-utils.version>5.0.0-RC1</springside-utils.version>
@ -276,6 +277,12 @@
<version>5.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alicloud-oss</artifactId>
<version>${alicloud-oss.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Loading…
Cancel
Save