批量请求数据

master
bynt 9 months ago
parent 9fceb523cc
commit b5acdf1c3e

@ -15,7 +15,9 @@
*/
package me.zhengjie.utils;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONException;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import eu.bitwalker.useragentutils.Browser;
@ -75,10 +77,10 @@ public class StringUtils extends org.apache.commons.lang3.StringUtils {
* @param specialTag
* @return
*/
public static<T> String listPrintWithSpecialSplit(List<T> list, String specialTag){
public static <T> String listPrintWithSpecialSplit(List<T> list, String specialTag) {
String str = list.toString().replaceAll("(?:\\[|null|]| +)", "");
if (StringUtils.isNotBlank(specialTag)){
if (StringUtils.isNotBlank(specialTag)) {
return str.replaceAll(",", specialTag);
}
return str;
@ -215,8 +217,15 @@ public class StringUtils extends org.apache.commons.lang3.StringUtils {
*/
public static String getHttpCityInfo(String ip) {
String api = String.format(ElAdminConstant.Url.IP_URL, ip);
JSONObject object = JSONUtil.parseObj(HttpUtil.get(api));
String string = StrUtil.EMPTY;
try {
string = HttpUtil.get(api);
JSONObject object = JSONUtil.parseObj(string);
return object.get("addr", String.class);
} catch (JSONException jsonException) {
log.error("=========== parse json error body as {} ============", string);
}
return StrUtil.EMPTY;
}
/**

@ -151,4 +151,6 @@ public class LogServiceImpl implements LogService {
public void delAllByInfo() {
logRepository.deleteByLogType("INFO");
}
}

@ -9,7 +9,7 @@ import org.springframework.stereotype.Component;
@Generated(
value = "org.mapstruct.ap.MappingProcessor",
date = "2023-12-21T16:28:48+0800",
date = "2023-12-26T11:47:03+0800",
comments = "version: 1.3.1.Final, compiler: javac, environment: Java 1.8.0_251 (Oracle Corporation)"
)
@Component

@ -9,7 +9,7 @@ import org.springframework.stereotype.Component;
@Generated(
value = "org.mapstruct.ap.MappingProcessor",
date = "2023-12-21T16:28:48+0800",
date = "2023-12-26T11:47:03+0800",
comments = "version: 1.3.1.Final, compiler: javac, environment: Java 1.8.0_251 (Oracle Corporation)"
)
@Component

@ -1,5 +1,6 @@
package me.zhengjie.config.thread;
import me.zhengjie.modules.constant.DefaultConstant;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
@ -36,7 +37,7 @@ public class ThreadPoolConfig {
@Bean(value = "SendBigDataTaskExecutor")
public Executor SendBigDataTaskExecutor(){
public Executor SendBigDataTaskExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
saveCorePoolSize,
saveMaxPoolSize,
@ -49,4 +50,12 @@ public class ThreadPoolConfig {
return threadPoolExecutor;
}
@Bean("workExecutor")
public Executor getExecutor() {
return new ThreadPoolExecutor(10, Integer.MAX_VALUE,
DefaultConstant.THREE_NUMBER, TimeUnit.MINUTES,
new LinkedBlockingDeque<>(saveQueueCapacity),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
}
}

@ -19,6 +19,7 @@ import com.google.common.collect.Sets;
import com.jcraft.jsch.Session;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import me.zhengjie.config.thread.ThreadPoolConfig;
import me.zhengjie.exception.BadRequestException;
import me.zhengjie.modules.constant.DefaultConstant;
import me.zhengjie.modules.constant.FileConstant;
@ -42,6 +43,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import java.io.File;
import java.io.IOException;
@ -51,9 +53,11 @@ import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import static me.zhengjie.modules.upload.consts.SysConst.*;
@ -117,6 +121,11 @@ public class SaveToFileNewTask {
@Autowired
private TbUploadFileNewService tbUploadFileNewService;
@Autowired
ThreadPoolConfig threadConfig;
@Async(value = "SendBigDataTaskExecutor")
public void doRunTask(TbUploadFileNewDto tbUploadFileNewDto) {
Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
@ -181,8 +190,10 @@ public class SaveToFileNewTask {
if (CollUtil.isNotEmpty(decryptionList)) {
List<String> stringList = decryptFile(decryptionList);
if (CollUtil.isNotEmpty(stringList)) {
// 替换集合
phoneList = stringList;
// 替换集合并去除空格
phoneList = stringList.stream().filter
(StringUtils::isNotBlank).collect(Collectors.toList());
log.info("============ the phone list as {} ============",phoneList.size());
}
}
}
@ -190,7 +201,7 @@ public class SaveToFileNewTask {
List<String> list = preEncryptNumMap.get(PRE_SEND_NUM_LENGTH);
phoneList = Lists.newArrayList(Sets.newHashSet(list));
}
if (CollectionUtil.isNotEmpty(phoneList)) {
if (CollUtil.isNotEmpty(phoneList)) {
log.info("============= the random number as {} =============", Base64.encode(RandomUtil.randomEle(phoneList)));
batchSendToEncrypt(filePath, phoneList);
}
@ -221,10 +232,20 @@ public class SaveToFileNewTask {
return DefaultConstant.ZERO_NUMBER;
}
@SneakyThrows
private List<String> decryptFile(List<String> phoneList) {
StopWatch stopWatch = new StopWatch();
List<String> stringList = Lists.newArrayList();
// 取出线程池
Executor executor = threadConfig.getExecutor();
List<CompletableFuture<List<String>>> results = Lists.newArrayList();
List<List<String>> partition = Lists.partition(phoneList, SEND_ENCRYPT_LIMIT);
int batchNum = partition.size();
stopWatch.start();
for (List<String> list : partition) {
TimeUnit.MILLISECONDS.sleep(300L);
// 异步线程池
CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(() -> {
// 转换
String decryptStr = StringUtils.listPrintWithSpecialSplit(list, null);
// 构造请求
@ -233,17 +254,24 @@ public class SaveToFileNewTask {
(decryptStr).requestId(RandomUtil.randomString(TEN_NUM)).build();
// 调用HTTP请求发送数据
DecryptionDTO decryptionDTO = JSONUtil.toBean(HttpUtil.post(decryptAddress, JSON.toJSONString(build)), DecryptionDTO.class);
if (StringUtils.isNotBlank(decryptionDTO.getReason())
&& "success".equals(decryptionDTO.getReason())) {
if (StringUtils.isNotBlank(decryptionDTO.getCellPhone())) {
if (org.apache.commons.lang3.StringUtils.isNotBlank(decryptionDTO.getReason())
&& "success".equals(decryptionDTO.getReason()) && (org.apache.commons.lang3.StringUtils.isNotBlank(decryptionDTO.getCellPhone()))) {
List<String> parseArray = JSON.parseArray(decryptionDTO.getCellPhone(), String.class);
if (!parseArray.isEmpty()) {
stringList.addAll(parseArray);
log.info("================ response size as {} ================", parseArray.size());
return parseArray;
}
}
log.info("============== the response as {} ===========",JSONUtil.toJsonStr(decryptionDTO));
return Lists.newArrayList();
}, executor);
results.add(future);
}
}
// 线程结束取出结果集
CompletableFuture<Void> allCompletableFuture = CompletableFuture.allOf(results.toArray(new CompletableFuture[batchNum]));
List<List<String>> result = allCompletableFuture.thenApply(e -> results.stream().map(CompletableFuture::join).collect(Collectors.toList())).join();
result.forEach(stringList::addAll);
stopWatch.stop();
log.info("================ send decryption info result size as {} ================", stringList.size());
return stringList;
}
@ -333,7 +361,7 @@ public class SaveToFileNewTask {
String responseFilePath = filePath + TEMP_FILE_END_STR;
// 写入指定路径
writeToFile(stringList, responseFilePath);
log.info("================ 334 response size as {} ================", stringList.size());
log.info("================ response size as {} ================", stringList.size());
return Boolean.TRUE;
}
return Boolean.FALSE;

Loading…
Cancel
Save