异步文件处理

master
yqy 3 years ago
parent 8ba0569459
commit 2a650960e7

@ -35,10 +35,11 @@ public class FeignConfiguration implements RequestInterceptor {
} }
Map<String, String> headerMap = RequestHeaderHandler.getHeaderMap(); Map<String, String> headerMap = RequestHeaderHandler.getHeaderMap();
if (CollUtil.isNotEmpty(headerMap)) { if (CollUtil.isNotEmpty(headerMap)) {
headerMap.forEach((headerName, headerValue) -> { for (Map.Entry<String, String> headerEntry : headerMap.entrySet()) {
log.debug("《FeignRequestInterceptor》 headerName:【{}】 headerValue:【{}】", headerName, headerValue); if (headerEntry.getKey().equalsIgnoreCase(SecurityConstants.AUTHORIZATION)) {
requestTemplate.header(headerName, headerValue); requestTemplate.header(headerEntry.getKey(), headerEntry.getValue());
}); }
}
} }
log.debug("========================== ↑↑↑↑↑↑ 《FeignRequestInterceptor》 End... ↑↑↑↑↑↑ =========================="); log.debug("========================== ↑↑↑↑↑↑ 《FeignRequestInterceptor》 End... ↑↑↑↑↑↑ ==========================");
} }

@ -73,7 +73,9 @@ public class TaskServiceImpl implements TaskService {
} }
//设置默认值 //设置默认值
task.setIsDistribution(DefaultNumberConstants.ZERO_NUMBER); task.setIsDistribution(DefaultNumberConstants.ZERO_NUMBER);
task.setCreateBy(SecurityUtils.getCurrentUserId()); if (task.getCreateBy() == null){
task.setCreateBy(SecurityUtils.getCurrentUserId());
}
taskRepository.save(task); taskRepository.save(task);
return new ResponseEntity<>(CommonResponse.createBySuccess(ResponseCode.SUCCESS), HttpStatus.OK); return new ResponseEntity<>(CommonResponse.createBySuccess(ResponseCode.SUCCESS), HttpStatus.OK);
} }
@ -125,6 +127,9 @@ public class TaskServiceImpl implements TaskService {
if (task.getTotalNumber() != null){ if (task.getTotalNumber() != null){
updateTask.setTotalNumber(task.getTotalNumber()); updateTask.setTotalNumber(task.getTotalNumber());
} }
if (task.getIsDistribution() != null){
updateTask.setIsDistribution(task.getIsDistribution());
}
taskRepository.save(updateTask); taskRepository.save(updateTask);
}else { }else {
throw new BadRequestException("任务id不能为空"); throw new BadRequestException("任务id不能为空");

@ -52,6 +52,7 @@ public class ReportSync {
*/ */
@Scheduled(cron = "0 0 23 * * ? ") @Scheduled(cron = "0 0 23 * * ? ")
public void countReport() { public void countReport() {
log.info("++++++++++++++++++++++countReport Sync start time {} ++++++++++++++++", DateUtil.now());
String beginOfDay = DateUtil.formatDateTime(DateUtil.yesterday()); String beginOfDay = DateUtil.formatDateTime(DateUtil.yesterday());
String endOfDay = DateUtil.formatDateTime(DateUtil.date()); String endOfDay = DateUtil.formatDateTime(DateUtil.date());
//查询今天的线索情况 //查询今天的线索情况
@ -63,6 +64,7 @@ public class ReportSync {
autoCountReportByTask(beginOfDay, endOfDay, callClueInfos); autoCountReportByTask(beginOfDay, endOfDay, callClueInfos);
//按人员统计 //按人员统计
autoCountReportByUser(beginOfDay, endOfDay, callClueInfos); autoCountReportByUser(beginOfDay, endOfDay, callClueInfos);
log.info("++++++++++++++++++++++countReport Sync end time {} ++++++++++++++++", DateUtil.now());
} }
private void autoCountReportByTask(String beginOfDay, String endOfDay, List<CallClueInfo> callClueInfos) { private void autoCountReportByTask(String beginOfDay, String endOfDay, List<CallClueInfo> callClueInfos) {

@ -129,4 +129,7 @@ public interface ClueMiddleRepository extends JpaRepository<ClueMiddle, Long>, J
@Modifying @Modifying
@Query(value = " update tb_clue_middle set member_id = ?2, member_status = 1 where clue_id in ?1 ", nativeQuery = true) @Query(value = " update tb_clue_middle set member_id = ?2, member_status = 1 where clue_id in ?1 ", nativeQuery = true)
void updateMemberIdByClueIdIn(List<Long> clueIdList, Long userId); void updateMemberIdByClueIdIn(List<Long> clueIdList, Long userId);
@Query(value = "select clue_id from tb_clue_middle where task_id = ?1", nativeQuery = true)
Set<Long> findByTaskIdReturnClueId(Long taskId);
} }

@ -3,6 +3,8 @@ package com.baiye.module.dao;
import com.baiye.module.entity.ClueRecord; import com.baiye.module.entity.ClueRecord;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor; import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import java.util.List; import java.util.List;
@ -23,4 +25,8 @@ public interface ClueRecordRepository extends JpaRepository<ClueRecord, Long>, J
* @param taskIds * @param taskIds
*/ */
void deleteByTaskIdIn(Set<Long> taskIds); void deleteByTaskIdIn(Set<Long> taskIds);
@Modifying
@Query(value = " update tb_clue_record set status = 0 where task_id = ?1 ", nativeQuery = true)
void updateType(Long taskId);
} }

@ -60,15 +60,19 @@ public class ClueServiceImpl implements ClueService {
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void saveClue(List<Clue> clueList, Long taskId) { public void saveClue(List<Clue> clueList, Long taskId) {
if (clueList.size() > 0) { if (clueList.size() > 0) {
List<Clue> clueListAll = clueRepository.saveAll(clueList); try {
//插入中间表数据 List<Clue> clueListAll = clueRepository.saveAll(clueList);
for (Clue clue : clueListAll) { //插入中间表数据
ClueMiddle clueMiddle = new ClueMiddle(); for (Clue clue : clueListAll) {
clueMiddle.setClueId(clue.getId()); ClueMiddle clueMiddle = new ClueMiddle();
clueMiddle.setTaskId(taskId); clueMiddle.setClueId(clue.getId());
clueMiddle.setMemberStatus(DefaultNumberConstants.ZERO_NUMBER); clueMiddle.setTaskId(taskId);
clueMiddle.setOptimisticVersion(DefaultNumberConstants.ZERO_NUMBER); clueMiddle.setMemberStatus(DefaultNumberConstants.ZERO_NUMBER);
clueMiddleRepository.save(clueMiddle); clueMiddle.setOptimisticVersion(DefaultNumberConstants.ZERO_NUMBER);
clueMiddleRepository.save(clueMiddle);
}
}catch (Exception e){
throw new BadRequestException("===================插入资源发生错误===================");
} }
} }
} }
@ -184,14 +188,14 @@ public class ClueServiceImpl implements ClueService {
Map<String, List<String>> mapList = organizeClient.getLabel(clueDto).getBody(); Map<String, List<String>> mapList = organizeClient.getLabel(clueDto).getBody();
List<String> organizeLabel = mapList.get("organizeLabel"); List<String> organizeLabel = mapList.get("organizeLabel");
List<String> baseLabel = mapList.get("baseLabel"); List<String> baseLabel = mapList.get("baseLabel");
clueDto.setBaseLabel(baseLabel);
clueDto.setOrganizeLabel(organizeLabel);
if (CollUtil.isEmpty(organizeLabel)){ if (CollUtil.isEmpty(organizeLabel)){
clueDto.setOrganizeLabel(new ArrayList<>()); clueDto.setOrganizeLabel(new ArrayList<>());
} }
if (CollUtil.isEmpty(baseLabel)){ if (CollUtil.isEmpty(baseLabel)){
clueDto.setBaseLabel(new ArrayList<>()); clueDto.setBaseLabel(new ArrayList<>());
} }
clueDto.setBaseLabel(baseLabel);
clueDto.setOrganizeLabel(organizeLabel);
} }
} }
return clueDtoList; return clueDtoList;
@ -350,7 +354,6 @@ public class ClueServiceImpl implements ClueService {
task.setTaskName(taskName); task.setTaskName(taskName);
task.setTotalNumber(taskNum); task.setTotalNumber(taskNum);
task.setTaskType(DefaultNumberConstants.ZERO_NUMBER); task.setTaskType(DefaultNumberConstants.ZERO_NUMBER);
taskClient.saveTask(task);
try { try {
taskClient.saveTask(task); taskClient.saveTask(task);
} catch (Exception e) { } catch (Exception e) {

@ -122,9 +122,8 @@ public class UploadFileServiceImpl implements UploadFileService {
ClueRecord clueRecord = saveClueRecord(name, newFileUrl, uploadType, userId, taskId); ClueRecord clueRecord = saveClueRecord(name, newFileUrl, uploadType, userId, taskId);
clueRecordList.add(clueRecord); clueRecordList.add(clueRecord);
} }
//异步读取文件并创建任务 //异步读取文件并创建任务
fileAnalysisTask.runFileAnalysisTask(clueRecordList, oneFileName, RequestContextUtil.getHeaderMap()); fileAnalysisTask.runFileAnalysisTask(clueRecordList, oneFileName);
return new ResponseEntity<>(CommonResponse.createBySuccess(ResponseCode.SUCCESS), HttpStatus.OK); return new ResponseEntity<>(CommonResponse.createBySuccess(ResponseCode.SUCCESS), HttpStatus.OK);
} }
@ -166,6 +165,7 @@ public class UploadFileServiceImpl implements UploadFileService {
Task task = new Task(); Task task = new Task();
task.setId(taskId); task.setId(taskId);
task.setTotalNumber(taskNum); task.setTotalNumber(taskNum);
task.setIsDistribution(1);
taskClient.updateTask(task); taskClient.updateTask(task);
} }
return new ResponseEntity<>(CommonResponse.createBySuccess(ResponseCode.SUCCESS), HttpStatus.OK); return new ResponseEntity<>(CommonResponse.createBySuccess(ResponseCode.SUCCESS), HttpStatus.OK);

@ -1,9 +1,13 @@
package com.baiye.task; package com.baiye.task;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.excel.EasyExcelFactory; import com.alibaba.excel.EasyExcelFactory;
import com.baiye.exception.BadRequestException;
import com.baiye.feign.TaskClient;
import com.baiye.module.constant.FileConstant; import com.baiye.module.constant.FileConstant;
import com.baiye.module.dao.ClueJpa; import com.baiye.module.dao.ClueJpa;
import com.baiye.module.dao.ClueMiddleRepository; import com.baiye.module.dao.ClueMiddleRepository;
import com.baiye.module.dao.ClueRecordRepository;
import com.baiye.module.dao.ClueRepository; import com.baiye.module.dao.ClueRepository;
import com.baiye.module.entity.Clue; import com.baiye.module.entity.Clue;
import com.baiye.module.entity.ClueRecord; import com.baiye.module.entity.ClueRecord;
@ -24,7 +28,9 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -35,56 +41,59 @@ public class FileAnalysisTask {
@Resource @Resource
private ClueService clueService; private ClueService clueService;
@Resource @Resource
private ClueJpa clueJpa; private ClueJpa clueJpa;
@Resource @Resource
private ClueRepository clueRepository; private ClueRepository clueRepository;
@Resource @Resource
private ClueMiddleRepository clueMiddleRepository; private ClueMiddleRepository clueMiddleRepository;
@Resource
private TaskClient taskClient;
/** /**
* *
*
* @param clueRecords * @param clueRecords
* @param headerMap
*/ */
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
@Async(value = "SendBigDataTaskExecutor") @Async(value = "SendBigDataTaskExecutor")
@SneakyThrows //处理异常try //@SneakyThrows
public void runFileAnalysisTask(List<ClueRecord> clueRecords, String oneFileName, Map<String, String> headerMap) { public void runFileAnalysisTask(List<ClueRecord> clueRecords, String oneFileName){
try {
for (ClueRecord clueRecord : clueRecords) { for (ClueRecord clueRecord : clueRecords) {
BaseExcelVo baseExcelVo = new BaseExcelVo(); BaseExcelVo baseExcelVo = new BaseExcelVo();
baseExcelVo.setUserId(clueRecord.getCreateBy()); baseExcelVo.setUserId(clueRecord.getCreateBy());
baseExcelVo.setClueRecordId(clueRecord.getId()); baseExcelVo.setClueRecordId(clueRecord.getId());
baseExcelVo.setOrigin(clueRecord.getUploadType()); baseExcelVo.setOrigin(clueRecord.getUploadType());
baseExcelVo.setTaskId(clueRecord.getTaskId()); baseExcelVo.setTaskId(clueRecord.getTaskId());
baseExcelVo.setOneFileName(oneFileName); baseExcelVo.setOneFileName(oneFileName);
FileInputStream file = new FileInputStream(clueRecord.getNewFileUrl()); FileInputStream file = new FileInputStream(clueRecord.getNewFileUrl());
switch (clueRecord.getUploadType()){ switch (clueRecord.getUploadType()) {
case FileConstant.ZERO_NUMBER: case FileConstant.ZERO_NUMBER:
log.info("==============开始导入自定义表单============{}",Thread.currentThread().getName()); log.info("==============开始导入自定义表单============{}", Thread.currentThread().getName());
EasyExcelFactory.read(file, ZdyVo.class, new ZdyExcelListener(clueService,baseExcelVo)).build().readAll(); EasyExcelFactory.read(file, ZdyVo.class, new ZdyExcelListener(clueService, baseExcelVo)).build().readAll();
break; break;
case FileConstant.ONE_NUMBER: case FileConstant.ONE_NUMBER:
log.info("==============开始导入UC表单============{}",Thread.currentThread().getName()); log.info("==============开始导入UC表单============{}", Thread.currentThread().getName());
new UcCsvListener(clueService,baseExcelVo).readUcCsv(clueRecord.getNewFileUrl()); new UcCsvListener(clueService, baseExcelVo).readUcCsv(clueRecord.getNewFileUrl());
break; break;
case FileConstant.TWO_NUMBER: case FileConstant.TWO_NUMBER:
log.info("==============开始导入快手表单============{}",Thread.currentThread().getName()); log.info("==============开始导入快手表单============{}", Thread.currentThread().getName());
EasyExcelFactory.read(file, KsVo.class, new KsExcelListener(clueService,baseExcelVo)).build().readAll(); EasyExcelFactory.read(file, KsVo.class, new KsExcelListener(clueService, baseExcelVo)).build().readAll();
break; break;
default: default:
break; break;
}
} }
} catch (Exception e) {
taskClient.sendErrMessage("导入资源错误,请检查数据格式后重新导入", clueRecords.get(0).getCreateBy());
throw new BadRequestException("=========读取资源错误============");
} }
//创建任务 //创建任务
Long taskId = clueRecords.get(0).getTaskId(); Long taskId = clueRecords.get(0).getTaskId();
Long userId = clueRecords.get(0).getCreateBy(); Long userId = clueRecords.get(0).getCreateBy();
if (taskId != null && userId != null && StringUtils.isNotBlank(oneFileName)){ if (taskId != null && userId != null && StringUtils.isNotBlank(oneFileName)) {
// 任务内手机号去重,先去重然后创建任务(统计任务数量) // 任务内手机号去重,先去重然后创建任务(统计任务数量)
distinctNid(taskId); distinctNid(taskId);
// 创建任务 // 创建任务
@ -92,24 +101,24 @@ public class FileAnalysisTask {
baseExcelVo.setUserId(userId); baseExcelVo.setUserId(userId);
baseExcelVo.setTaskId(taskId); baseExcelVo.setTaskId(taskId);
baseExcelVo.setOneFileName(oneFileName); baseExcelVo.setOneFileName(oneFileName);
RequestHeaderHandler.setHeaderMap(headerMap);
clueService.saveTask(baseExcelVo); clueService.saveTask(baseExcelVo);
} }
} }
/** /**
* *
*
* @param taskId * @param taskId
*/ */
@Transactional(propagation = Propagation.REQUIRED) @Transactional(propagation = Propagation.REQUIRED)
public void distinctNid(Long taskId){ public void distinctNid(Long taskId) {
Set<Long> taskIds = new HashSet(); Set<Long> taskIds = new HashSet();
taskIds.add(taskId); taskIds.add(taskId);
List<Clue> clueList = clueJpa.findClue(taskIds,null); List<Clue> clueList = clueJpa.findClue(taskIds, null);
Map<String, List<Clue>> map = clueList.stream().collect(Collectors.groupingBy(Clue::getNid)); Map<String, List<Clue>> map = clueList.stream().collect(Collectors.groupingBy(Clue::getNid));
Set<String> keySet = map.keySet(); Set<String> keySet = map.keySet();
for (String key : keySet) { for (String key : keySet) {
if (map.get(key).size() > 1){ if (map.get(key).size() > 1) {
List<Clue> clues = map.get(key); List<Clue> clues = map.get(key);
clues.remove(0); clues.remove(0);
Set<Long> clueIdSet = new HashSet<>(); Set<Long> clueIdSet = new HashSet<>();

@ -33,7 +33,7 @@ public class TestingUtil {
String zdyAmount = String.valueOf(list.get(3)); String zdyAmount = String.valueOf(list.get(3));
String zdyCollectTime = String.valueOf(list.get(4)); String zdyCollectTime = String.valueOf(list.get(4));
String zdyAddress = String.valueOf(list.get(5)); String zdyAddress = String.valueOf(list.get(5));
if (!(zdyName.equals("姓名") && zdyNid.equals("电话") && zdyWx.equals("微信") && zdyCollectTime.equals("日期") if (!(zdyName.equals("姓名") && zdyNid.equals("电话") && zdyWx.equals("微信") && zdyCollectTime.equals("日期(****/**/** 00:00:00)")
&& zdyAddress.equals("线索归属地") && zdyAmount.equals("金额"))) { && zdyAddress.equals("线索归属地") && zdyAmount.equals("金额"))) {
log.info("======================自定义单内容格式不正确====================="); log.info("======================自定义单内容格式不正确=====================");
return false; return false;
@ -65,7 +65,7 @@ public class TestingUtil {
String taskName = String.valueOf(list.get(0)); String taskName = String.valueOf(list.get(0));
String taskNid = String.valueOf(list.get(1)); String taskNid = String.valueOf(list.get(1));
String taskCollectTime = String.valueOf(list.get(2)); String taskCollectTime = String.valueOf(list.get(2));
if (!(taskName.equals("姓名") && taskNid.equals("电话") && taskCollectTime.equals("日期"))) { if (!(taskName.equals("姓名") && taskNid.equals("电话") && taskCollectTime.equals("日期(****/**/** 00:00:00)"))) {
log.info("======================自定义单内容格式不正确====================="); log.info("======================自定义单内容格式不正确=====================");
return false; return false;
} }

@ -63,8 +63,8 @@ spring:
storage: storage:
url: /usr/local/webapp/source/files/ url: /usr/local/webapp/source/files/
de_symbol: / de_symbol: /
download-template: 118.178.137.129/api-source/download/custom.xlsx download-template: http://118.178.137.129:8001/source/download/custom.xlsx
download-template-move: 118.178.137.129/api-source/download/dynamictemplate.xlsx download-template-move: http://118.178.137.129:8001/source/download/dynamictemplate.xlsx
# 线程池配置 # 线程池配置
save: save:

Loading…
Cancel
Save