定时压缩投放数据
parent
54a4f1b6a9
commit
3626c8e0eb
@ -0,0 +1,227 @@
|
||||
package com.baiyee.adcallback.task;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.text.csv.CsvWriter;
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.baiyee.adcallback.api.dto.TfSourceDTO;
|
||||
import com.baiyee.adcallback.common.enums.TableEnum;
|
||||
import com.baiyee.adcallback.common.util.FileUtil;
|
||||
import com.baiyee.adcallback.repository.*;
|
||||
import com.baiyee.adcallback.repository.entity.TbTfSourceEntity;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.lingala.zip4j.core.ZipFile;
|
||||
import net.lingala.zip4j.exception.ZipException;
|
||||
import net.lingala.zip4j.model.ZipParameters;
|
||||
import net.lingala.zip4j.util.Zip4jConstants;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.PersistenceContext;
|
||||
import javax.persistence.Query;
|
||||
import java.io.File;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author wjt
|
||||
* @date 2023/6/30
|
||||
* <p>
|
||||
* 将每天产生得数据压缩成文件
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class TfSourceTaskNew {
|
||||
private final TbJLV2BackdataEntityRepository tbJLV2BackdataEntityRepository;
|
||||
private final TbJLV1BackdataEntityRepository tbJLV1BackdataEntityRepository;
|
||||
private final TbGdtNewBackdataEntityRepository tbGdtNewBackdataEntityRepository;
|
||||
private final TbKSNewBackdataEntityRepository tbKSNewBackdataEntityRepository;
|
||||
private final TbBDNewBackdataEntityRepository tbBDNewBackdataEntityRepository;
|
||||
private final TbTfSourceEntityRepository tbTfSourceEntityRepository;
|
||||
@PersistenceContext
|
||||
private EntityManager entityManager;
|
||||
@Value("${tf.source.path}")
|
||||
private String path;
|
||||
|
||||
@Scheduled(cron = "0 0 1 * * ? ")
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void compressSourceJlV1() {
|
||||
log.info("++++++++++++++++++++++source jlv1 compress start time {} ++++++++++++++++", DateUtil.now());
|
||||
DateTime startTime = DateUtil.beginOfDay(DateUtil.yesterday());
|
||||
DateTime endTime = DateUtil.endOfDay(DateUtil.yesterday());
|
||||
Integer total = tbJLV1BackdataEntityRepository.countByGmtCreate(startTime, endTime);
|
||||
saveData(total, startTime, endTime, TableEnum.JL1.getTableName());
|
||||
log.info("++++++++++++++++++++++TfSource jlv1 Sync end time {} ++++++++++++++++", DateUtil.now());
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0 0 2 * * ? ")
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void compressSourceJlV2() {
|
||||
log.info("++++++++++++++++++++++source jlv2 compress start time {} ++++++++++++++++", DateUtil.now());
|
||||
DateTime startTime = DateUtil.beginOfDay(DateUtil.yesterday());
|
||||
DateTime endTime = DateUtil.endOfDay(DateUtil.yesterday());
|
||||
Integer total = tbJLV2BackdataEntityRepository.countByGmtCreate(startTime, endTime);
|
||||
saveData(total, startTime, endTime, TableEnum.JL2.getTableName());
|
||||
log.info("++++++++++++++++++++++TfSource jlv2 Sync end time {} ++++++++++++++++", DateUtil.now());
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0 0 3 * * ? ")
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void compressSourceGdt() {
|
||||
log.info("++++++++++++++++++++++source gdt compress start time {} ++++++++++++++++", DateUtil.now());
|
||||
DateTime startTime = DateUtil.beginOfDay(DateUtil.yesterday());
|
||||
DateTime endTime = DateUtil.endOfDay(DateUtil.yesterday());
|
||||
Integer total = tbGdtNewBackdataEntityRepository.countByGmtCreate(startTime, endTime);
|
||||
saveData(total, startTime, endTime, TableEnum.GDT_NEW.getTableName());
|
||||
log.info("++++++++++++++++++++++TfSource gdt Sync end time {} ++++++++++++++++", DateUtil.now());
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0 0 4 * * ? ")
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void compressSourceKs() {
|
||||
log.info("++++++++++++++++++++++source ks compress start time {} ++++++++++++++++", DateUtil.now());
|
||||
DateTime startTime = DateUtil.beginOfDay(DateUtil.yesterday());
|
||||
DateTime endTime = DateUtil.endOfDay(DateUtil.yesterday());
|
||||
Integer total = tbKSNewBackdataEntityRepository.countByGmtCreate(startTime, endTime);
|
||||
saveData(total, startTime, endTime, TableEnum.KS_NEW.getTableName());
|
||||
log.info("++++++++++++++++++++++TfSource ks Sync end time {} ++++++++++++++++", DateUtil.now());
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0 0 5 * * ? ")
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void compressSourceBd() {
|
||||
log.info("++++++++++++++++++++++source bd compress start time {} ++++++++++++++++", DateUtil.now());
|
||||
DateTime startTime = DateUtil.beginOfDay(DateUtil.yesterday());
|
||||
DateTime endTime = DateUtil.endOfDay(DateUtil.yesterday());
|
||||
Integer total = tbBDNewBackdataEntityRepository.countByGmtCreate(startTime, endTime);
|
||||
saveData(total, startTime, endTime, TableEnum.BD_NEW.getTableName());
|
||||
log.info("++++++++++++++++++++++TfSource bd Sync end time {} ++++++++++++++++", DateUtil.now());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 压缩记录入库,每次压缩5万
|
||||
*/
|
||||
private void saveData(Integer total, DateTime startTime, DateTime endTime, String dataSourceName) {
|
||||
if (total <= 0) {
|
||||
return;
|
||||
}
|
||||
int startNum = 0;
|
||||
int num = 100000;
|
||||
int circulateNum = 0;
|
||||
Map<String, List<String>> map = new HashMap<>(16);
|
||||
while (startNum < total) {
|
||||
List<TfSourceDTO> tfSourceDTOS = queryData(dataSourceName, startTime, endTime, startNum, num);
|
||||
if (CollUtil.isEmpty(tfSourceDTOS)) {
|
||||
return;
|
||||
}
|
||||
//生成临时文件
|
||||
generateFile(tfSourceDTOS, map, circulateNum);
|
||||
circulateNum += 1;
|
||||
startNum += num;
|
||||
}
|
||||
String format = DateUtil.format(DateUtil.yesterday(), "yyyy-MM-dd");
|
||||
String filePath = path + File.separator + format;
|
||||
List<TbTfSourceEntity> saveList = new ArrayList<>();
|
||||
|
||||
for (Map.Entry<String, List<String>> entry : map.entrySet()) {
|
||||
String password = RandomUtil.randomString(6);
|
||||
String newPath = zipFile(filePath + File.separator + entry.getKey() + ".zip", entry.getValue(), password);
|
||||
if (StrUtil.isNotBlank(filePath)) {
|
||||
//保存文件记录
|
||||
TbTfSourceEntity tbTfSourceEntity = new TbTfSourceEntity().addTbTfSourceEntity(format, null, null, entry.getKey(), newPath, password);
|
||||
saveList.add(tbTfSourceEntity);
|
||||
}
|
||||
}
|
||||
tbTfSourceEntityRepository.saveAll(saveList);
|
||||
}
|
||||
|
||||
|
||||
public void generateFile(List<TfSourceDTO> list, Map<String, List<String>> map, int circulateNum) {
|
||||
//tag分组
|
||||
HashMap<String, List<TfSourceDTO>> mapByTag = new HashMap<>(list.stream().collect(Collectors.groupingBy(TfSourceDTO::getTag, Collectors.toList())));
|
||||
String format = DateUtil.format(DateUtil.yesterday(), "yyyy-MM-dd");
|
||||
//生成文件
|
||||
for (String tag : mapByTag.keySet()) {
|
||||
String filePath = path + File.separator + format + File.separator + tag + circulateNum + ".csv";
|
||||
File file = new File(filePath);
|
||||
CsvWriter writer = new CsvWriter(file, StandardCharsets.UTF_8);
|
||||
writer.writeBeans(mapByTag.get(tag));
|
||||
writer.flush();
|
||||
writer.close();
|
||||
List<String> mapValueList;
|
||||
if (map.containsKey(tag)) {
|
||||
mapValueList = map.get(tag);
|
||||
} else {
|
||||
mapValueList = new ArrayList<>();
|
||||
}
|
||||
mapValueList.add(filePath);
|
||||
map.put(tag, mapValueList);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成加密压缩包
|
||||
*
|
||||
* @param newPath 新地址
|
||||
* @param srcPath 临时地址
|
||||
* @param password 密码
|
||||
*/
|
||||
private String zipFile(String newPath, List<String> srcPath, String password) {
|
||||
ZipParameters zipParameters = new ZipParameters();
|
||||
zipParameters.setEncryptionMethod(Zip4jConstants.COMP_DEFLATE);
|
||||
zipParameters.setCompressionLevel(Zip4jConstants.DEFLATE_LEVEL_NORMAL);
|
||||
if (StrUtil.isNotBlank(password)) {
|
||||
zipParameters.setEncryptFiles(true);
|
||||
zipParameters.setEncryptionMethod(Zip4jConstants.ENC_METHOD_STANDARD);
|
||||
zipParameters.setPassword(password.toCharArray());
|
||||
}
|
||||
try {
|
||||
ZipFile zipFile = new ZipFile(newPath);
|
||||
ArrayList<File> files = new ArrayList<File>();
|
||||
srcPath.forEach(path -> {
|
||||
files.add(new File(path));
|
||||
});
|
||||
zipFile.addFiles(files, zipParameters);
|
||||
for (String path : srcPath) {
|
||||
FileUtil.del(path);
|
||||
}
|
||||
return newPath;
|
||||
} catch (ZipException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private List<TfSourceDTO> queryData(String tableName, DateTime startTime, DateTime endTime, Integer startNum, Integer endNum) {
|
||||
String sql = "select imei,idfa,mac,oaid,tag,vpoint FROM " + tableName +
|
||||
" where imei !='__IMEI__' and gmt_create > :startTime and gmt_create < :endTime ORDER BY id desc LIMIT :startNum , :endNum";
|
||||
Query selectQuery = entityManager.createNativeQuery(sql);
|
||||
selectQuery.setParameter("startTime", DateUtil.format(startTime, "yyyy-MM-dd HH:mm:ss"));
|
||||
selectQuery.setParameter("endTime", DateUtil.format(endTime, "yyyy-MM-dd HH:mm:ss"));
|
||||
selectQuery.setParameter("startNum", startNum);
|
||||
selectQuery.setParameter("endNum", endNum);
|
||||
List list = selectQuery.getResultList();
|
||||
List<TfSourceDTO> tfSourceDTOS = new ArrayList<>();
|
||||
for (Object row : list) {
|
||||
Object[] cells = (Object[]) row;
|
||||
TfSourceDTO source = new TfSourceDTO();
|
||||
source.setImei((String) cells[0]);
|
||||
source.setIdfa((String) cells[1]);
|
||||
source.setMac((String) cells[2]);
|
||||
source.setOaid((String) cells[3]);
|
||||
source.setTag((String) cells[4]);
|
||||
source.setVpoint((Integer) cells[5]);
|
||||
tfSourceDTOS.add(source);
|
||||
}
|
||||
return tfSourceDTOS;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue