更新大数据相关的测试和发送逻辑
parent
14cc1c8eed
commit
ca9cdafe4b
@ -1,4 +1,4 @@
|
||||
package me.zhengjie.common;
|
||||
package me.zhengjie.common.http;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
@ -1,4 +1,4 @@
|
||||
package me.zhengjie.common;
|
||||
package me.zhengjie.common.http;
|
||||
|
||||
/**
|
||||
* FileName: ResponseCode
|
@ -0,0 +1,19 @@
|
||||
package me.zhengjie.modules.buildrecord.rest.vo;
|
||||
|
||||
import lombok.Data;
|
||||
import me.zhengjie.modules.buildrecord.domain.BuildRecord;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
|
||||
@Data
|
||||
public class BuildRecordBuildVO implements Serializable {
|
||||
|
||||
/**
|
||||
* 前端相关的信息实体
|
||||
*/
|
||||
private BuildRecord resource;
|
||||
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
package me.zhengjie.modules.buildrecord.rest.vo;
|
||||
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import me.zhengjie.modules.buildrecord.domain.BuildRecord;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class BuildRecordSendVO implements Serializable {
|
||||
|
||||
/**
|
||||
* 该次发送的个数限
|
||||
*/
|
||||
private Long limit;
|
||||
|
||||
/**
|
||||
* 发送的活动任务Id,对应其他平台的活动Id库
|
||||
*/
|
||||
private Long taskId;
|
||||
|
||||
/**
|
||||
* 对应下游推送环境标识, 占时用数字来进行标识 0 标识为测试环境,1-5为正式环境
|
||||
*/
|
||||
private String addressTag;
|
||||
|
||||
/**
|
||||
* 前端相关的信息实体
|
||||
*/
|
||||
private BuildRecord resource;
|
||||
|
||||
/**
|
||||
* 需要发送的活动名称
|
||||
*/
|
||||
private String sendName;
|
||||
}
|
@ -0,0 +1,127 @@
|
||||
package me.zhengjie.modules.buildrecord.task;
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.spatial4j.core.context.SpatialContext;
|
||||
import com.spatial4j.core.distance.DistanceUtils;
|
||||
import com.spatial4j.core.shape.Rectangle;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import me.zhengjie.modules.buildrecord.domain.BuildRecord;
|
||||
import me.zhengjie.modules.buildrecord.task.convert.BuildTaskQueryParamJsonConvert;
|
||||
import me.zhengjie.modules.edu.service.EduService;
|
||||
import me.zhengjie.modules.tag.service.TagService;
|
||||
import me.zhengjie.modules.tag.service.dto.TagDto;
|
||||
import me.zhengjie.utils.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Component
|
||||
@Scope("prototype")
|
||||
@Slf4j
|
||||
public class ProduceBigDataTask {
|
||||
|
||||
/**
|
||||
* 成功构建标识 1
|
||||
*/
|
||||
private static final int SUCCESS_BUILD_TAG = 1;
|
||||
|
||||
/**
|
||||
* 字符串切割标识 - 目前用于横纵坐标的切割
|
||||
*/
|
||||
private static final String SPLIT_TAG = ",";
|
||||
|
||||
@Autowired
|
||||
private EduService eduService;
|
||||
@Autowired
|
||||
private TagService tagService;
|
||||
|
||||
|
||||
|
||||
@Async(value = "ProduceBigDataTaskExecutor")
|
||||
public void doRunTask(BuildRecord task){
|
||||
Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
|
||||
log.info("====== [ task start running, task name is {} ] ======", "ProduceBigDataTask");
|
||||
runTask(task);
|
||||
Long endMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
|
||||
log.info("====== [ task start end, task name is {},cost milliSecond is {} ] ======", "ProduceBigDataTask", (endMilliSecond - satrtMilliSecond));
|
||||
}
|
||||
|
||||
private void runTask(BuildRecord task) {
|
||||
// 解析需要的查询参数
|
||||
String params = task.getParams();
|
||||
if (StringUtils.isBlank(params)){
|
||||
return;
|
||||
}
|
||||
// 构建解析类对Json进行解析
|
||||
BuildTaskQueryParamJsonConvert convert = JSON.parseObject(params, BuildTaskQueryParamJsonConvert.class);
|
||||
// 地图点进行计算,获取选点范围
|
||||
String localCode = convert.getLocalCode();
|
||||
Integer range = convert.getRange();
|
||||
Rectangle rectangle = null;
|
||||
if (StringUtils.isNotBlank(localCode) && range != null){
|
||||
rectangle = calculateLocalPoint(localCode, range);
|
||||
}
|
||||
if (rectangle == null && StringUtils.isNotBlank(localCode)){
|
||||
log.error(" ====================== {please check localPoint calculate is ok ! } ==========================");
|
||||
return;
|
||||
}
|
||||
// 进行两表关联进行复杂查询
|
||||
Pageable pageable = PageRequest.of(0,200);
|
||||
while(true){
|
||||
// 分页进行查询结果
|
||||
Page<String> page = eduService.queryMatchData(convert.getCityCode(), convert.getStuGrade(), rectangle, pageable);
|
||||
if (CollectionUtil.isEmpty(page)){
|
||||
return;
|
||||
}
|
||||
// 把待发送记录存入分表
|
||||
List<TagDto> tagDtos = new ArrayList<>();
|
||||
page.forEach(
|
||||
each->{
|
||||
TagDto tagDto = new TagDto();
|
||||
tagDto.setUid(each.trim());
|
||||
tagDto.setPushStatus(0);
|
||||
tagDto.setTaskId(task.getTaskBuildId());
|
||||
tagDtos.add(tagDto);
|
||||
}
|
||||
);
|
||||
Long aLong = tagService.saveAll(tagDtos);
|
||||
if (aLong <= 0){
|
||||
log.error("==== [save send record fail, please check , ready insert record is {} , activity id is {}] ====", tagDtos.toString(), task.getTaskBuildId());
|
||||
}
|
||||
if (!page.hasNext()){
|
||||
break;
|
||||
}
|
||||
pageable = page.nextPageable();
|
||||
task.setTotal(page.getTotalElements());
|
||||
}
|
||||
// 生成完成后,对各种状态进行记录
|
||||
task.setIsBuild(SUCCESS_BUILD_TAG);
|
||||
}
|
||||
|
||||
private Rectangle calculateLocalPoint(String localCode, Integer range) {
|
||||
// 处理切割横纵坐标字符串
|
||||
String[] split = StringUtils.split(localCode.trim(), SPLIT_TAG);
|
||||
if (split.length != 2){
|
||||
return null;
|
||||
}
|
||||
// 转换类型
|
||||
Double x = Double.valueOf(split[0]);
|
||||
Double y = Double.valueOf(split[1]);
|
||||
// 坐标计算
|
||||
SpatialContext geo = SpatialContext.GEO;
|
||||
Rectangle rectangle = geo.getDistCalc().calcBoxByDistFromPt(
|
||||
geo.makePoint(x, y), range * DistanceUtils.KM_TO_DEG, geo, null
|
||||
);
|
||||
return rectangle;
|
||||
}
|
||||
}
|
@ -0,0 +1,176 @@
|
||||
package me.zhengjie.modules.buildrecord.task;
|
||||
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import cn.hutool.http.HttpResponse;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import me.zhengjie.common.json.PushDBJsonContent;
|
||||
import me.zhengjie.modules.buildrecord.domain.BuildRecord;
|
||||
import me.zhengjie.modules.buildrecord.service.BuildRecordService;
|
||||
import me.zhengjie.modules.buildrecord.service.dto.BuildRecordDto;
|
||||
import me.zhengjie.modules.buildrecord.task.dto.SendBigDataDTO;
|
||||
import me.zhengjie.modules.tag.service.TagService;
|
||||
import me.zhengjie.modules.tag.service.dto.TagDto;
|
||||
import me.zhengjie.modules.tag.service.dto.TagQueryCriteria;
|
||||
import me.zhengjie.utils.HttpUtil;
|
||||
import me.zhengjie.utils.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Component
|
||||
@Scope("prototype")
|
||||
@Slf4j
|
||||
public class SendBigDataTask {
|
||||
|
||||
/**
|
||||
* 未发送的状态
|
||||
*/
|
||||
private static final int NON_FINISH_SEND_STATATUS = 0;
|
||||
|
||||
/**
|
||||
* 每次发送条数限制
|
||||
*/
|
||||
private static final int SEND_LIMIT = 500;
|
||||
|
||||
/**
|
||||
* 完成发送任务标识
|
||||
*/
|
||||
private static final int FINISH_SEND_TAG = 1;
|
||||
|
||||
/**
|
||||
* 用于随机选的环境代替字符, 如果添加环境就进行自动添加
|
||||
*/
|
||||
public static final String BASE_URL_CHAR_NUMBER = "12345";
|
||||
|
||||
/**
|
||||
* 下游发送url
|
||||
*/
|
||||
@Value(value = "${req.db.url}")
|
||||
private String url;
|
||||
|
||||
/**
|
||||
* 下游推送ip:端口
|
||||
*/
|
||||
@Value(value = "${req.db.host}")
|
||||
private String host;
|
||||
|
||||
|
||||
@Autowired
|
||||
private BuildRecordService buildRecordService;
|
||||
@Autowired
|
||||
private TagService tagService;
|
||||
|
||||
@Async(value = "SendBigDataTaskExecutor")
|
||||
public void doRunTask(BuildRecord resource, SendBigDataDTO sendBigDataDTO){
|
||||
Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
|
||||
log.info("====== [ task start running, task name is {} ] ======", "SendBigDataTask");
|
||||
runTask(resource, sendBigDataDTO);
|
||||
Long endMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
|
||||
log.info("====== [ task start end, task name is {},cost milliSecond is {} ] ======", "SendBigDataTask", (endMilliSecond - satrtMilliSecond));
|
||||
}
|
||||
|
||||
private void runTask(BuildRecord resource, SendBigDataDTO sendBigDataDTO) {
|
||||
// 根据发送任务的Id来读取发送号码表
|
||||
Integer id = resource.getId();
|
||||
if (id <= 0 ){
|
||||
return;
|
||||
}
|
||||
BuildRecordDto buildRecordDto = buildRecordService.findById(id);
|
||||
// 查询记录表,返回这个id去发送记录表中进行查找 - 构建发送体
|
||||
TagQueryCriteria tagQueryCriteria = new TagQueryCriteria();
|
||||
tagQueryCriteria.setTaskId((long)id);
|
||||
List<TagDto> tagDtos = tagService.queryAll(tagQueryCriteria);
|
||||
// 遍历查询等待发送的列表
|
||||
List<TagDto> collect = tagDtos.stream()
|
||||
.filter(one -> one.getPushStatus() == NON_FINISH_SEND_STATATUS)
|
||||
.distinct()
|
||||
.limit(sendBigDataDTO.getLimit())
|
||||
.collect(Collectors.toList());
|
||||
// 对需要发送的字段进行发送
|
||||
batchSend(collect, sendBigDataDTO, buildRecordDto);
|
||||
// 对发送后的状态进行更新
|
||||
resource.setIsSend(FINISH_SEND_TAG);
|
||||
}
|
||||
|
||||
private void batchSend(List<TagDto> collect, SendBigDataDTO sendRecordDTO, BuildRecordDto buildRecordDto) {
|
||||
List<List<TagDto>> partition = Lists.partition(collect, SEND_LIMIT);
|
||||
partition.forEach(
|
||||
list->{
|
||||
// 调用推送地址进行推送
|
||||
PushDBJsonContent pushDBJsonContent = new PushDBJsonContent();
|
||||
pushDBJsonContent.setActId(sendRecordDTO.getTaskId());
|
||||
pushDBJsonContent.setActName(sendRecordDTO.getSendName());
|
||||
// 加入每一个号码对应接口字段
|
||||
List<PushDBJsonContent.Client> clientList = new ArrayList<>();
|
||||
list.forEach(
|
||||
each->{
|
||||
PushDBJsonContent.Client client = new PushDBJsonContent.Client();
|
||||
client.setCellphone(each.getUid());
|
||||
clientList.add(client);
|
||||
// 推送完成后修改状态为 已经推送
|
||||
each.setPushStatus(FINISH_SEND_TAG);
|
||||
}
|
||||
);
|
||||
pushDBJsonContent.setClientList(clientList);
|
||||
// 进行Json解析
|
||||
String jsonStr = JSON.toJSONString(pushDBJsonContent);
|
||||
if (StringUtils.isNotBlank(jsonStr)){
|
||||
log.info("============ [ Pre send Json is : {} ] ============", jsonStr);
|
||||
int count = 1;
|
||||
// 失败重发请求3次
|
||||
while (count <= 3){
|
||||
// 对发送请求地址进行准备
|
||||
String addressTag = sendRecordDTO.getAddressTag();
|
||||
String address ="";
|
||||
if (StringUtils.isNotBlank(addressTag)){
|
||||
address = preSendReqAddress(addressTag);
|
||||
log.info("========== [DB request address is {} ] =========", address);
|
||||
}
|
||||
// 调用HTTP请求发送数据
|
||||
HttpResponse httpResponse = HttpUtil.sendPostReq(address, jsonStr);
|
||||
if (httpResponse.isOk() && httpResponse.body().contains("true")){
|
||||
log.info("========== [DB request success, response is {} ] ==========", httpResponse.body());
|
||||
break;
|
||||
}else{
|
||||
count ++;
|
||||
log.error("========== [DB request fail, response is {} ] ==========", httpResponse.body());
|
||||
}
|
||||
}
|
||||
if (count > 3) {
|
||||
log.error("========== [DB update send status fail, url is {} ] ==========", url);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// 把修改完状态的进行更新
|
||||
Long aLong = tagService.saveAll(collect);
|
||||
if (aLong <= 0){
|
||||
log.error("================= [update data fail , please check it .] =================");
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private String preSendReqAddress(String tag) {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
// 处理环境选择问题, 只接受单个 a, b, c 中的任意一个输入,如果不输入就进行随机返回
|
||||
tag = (StringUtils.isNotBlank(tag) && StringUtils.countMatches(BASE_URL_CHAR_NUMBER, tag.trim()) == 1)?
|
||||
tag.trim(): String.valueOf(RandomUtil.randomChar(BASE_URL_CHAR_NUMBER));
|
||||
// 拼接成需要的URL进行返回
|
||||
builder.append(host)
|
||||
.append(tag)
|
||||
.append(url);
|
||||
// 返回拼接结果
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
package me.zhengjie.modules.buildrecord.task.convert;
|
||||
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 用于对传入的json体进行解析
|
||||
*/
|
||||
@Data
|
||||
public class BuildTaskQueryParamJsonConvert {
|
||||
|
||||
/**
|
||||
* 城市编码集合
|
||||
*/
|
||||
private List<Integer> cityCode;
|
||||
|
||||
/**
|
||||
* 报课年级集合
|
||||
*/
|
||||
private List<Integer> stuGrade;
|
||||
|
||||
/**
|
||||
* 坐标点 用逗号进行分割
|
||||
*/
|
||||
private String localCode;
|
||||
|
||||
/**
|
||||
* 坐标的辐射范围, 单位 公里
|
||||
*/
|
||||
private Integer range;
|
||||
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package me.zhengjie.modules.buildrecord.task.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 同于任务的DTO模型Bean
|
||||
*/
|
||||
@Data
|
||||
public class SendBigDataDTO {
|
||||
/**
|
||||
* 该次发送的个数限
|
||||
*/
|
||||
private Long limit;
|
||||
|
||||
/**
|
||||
* 发送的活动任务Id,对应其他平台的活动Id库
|
||||
*/
|
||||
private Long taskId;
|
||||
|
||||
/**
|
||||
* 对应下游推送环境标识, 占时用数字来进行标识 0 标识为测试环境,1-5为正式环境
|
||||
*/
|
||||
private String addressTag;
|
||||
|
||||
/**
|
||||
* 需要发送的活动名称
|
||||
*/
|
||||
private String sendName;
|
||||
}
|
@ -1,12 +1,13 @@
|
||||
package me.zhengjie.modules.taskrecord.rest.vo;
|
||||
|
||||
import lombok.Data;
|
||||
import me.zhengjie.modules.buildrecord.domain.BuildRecord;
|
||||
import me.zhengjie.modules.taskrecord.domain.TaskRecord;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@Data
|
||||
public class TaskRecordBuildVO implements Serializable {
|
||||
public class TaskBuildRecordVO implements Serializable {
|
||||
|
||||
|
||||
/**
|
@ -1,4 +1,4 @@
|
||||
package me.zhengjie.task;
|
||||
package me.zhengjie.modules.taskrecord.task;
|
||||
|
||||
import cn.hutool.system.OsInfo;
|
||||
import cn.hutool.system.SystemUtil;
|
@ -1,4 +1,4 @@
|
||||
package me.zhengjie.task;
|
||||
package me.zhengjie.modules.taskrecord.task;
|
||||
|
||||
import cn.hutool.system.OsInfo;
|
||||
import cn.hutool.system.SystemUtil;
|
@ -0,0 +1,36 @@
|
||||
package me.zhengjie.modules.taskrecord.task.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 同于任务的DTO模型Bean
|
||||
*/
|
||||
@Data
|
||||
public class SendRecordDTO {
|
||||
|
||||
/**
|
||||
* 需要进行发送课包的Id列表
|
||||
*/
|
||||
private Integer id;
|
||||
|
||||
/**
|
||||
* 该次发送的个数限
|
||||
*/
|
||||
private Long limit;
|
||||
|
||||
/**
|
||||
* 发送的活动任务Id,对应其他平台的活动Id库
|
||||
*/
|
||||
private Long taskId;
|
||||
|
||||
/**
|
||||
* 对应下游推送环境标识, 占时用数字来进行标识 0 标识为测试环境,1-5为正式环境
|
||||
*/
|
||||
private String addressTag;
|
||||
|
||||
/**
|
||||
* 需要发送的活动名称
|
||||
*/
|
||||
private String sendName;
|
||||
|
||||
}
|
Loading…
Reference in New Issue