@ -20,6 +20,7 @@ import me.zhengjie.modules.taskrecord.service.TaskRecordService;
import me.zhengjie.modules.taskrecord.service.dto.TaskRecordDto ;
import me.zhengjie.modules.taskrecord.service.dto.TaskRecordQueryCriteria ;
import me.zhengjie.utils.ConvertUtil ;
import me.zhengjie.utils.FileUtil ;
import me.zhengjie.utils.HttpUtil ;
import me.zhengjie.utils.StringUtils ;
import org.springframework.beans.factory.annotation.Autowired ;
@ -31,23 +32,23 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component ;
import java.io.IOException ;
import java.nio.charset.Charset ;
import java.nio.file.Files ;
import java.nio.file.Paths ;
import java.time.LocalDateTime ;
import java.time.ZoneOffset ;
import java.util.* ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.concurrent.atomic.AtomicLong ;
import java.util.stream.Collectors ;
import static me.zhengjie.modules.common.consts.SendBigDataConst.NON_FINISH_SEND_STATATUS ;
@Component
@Scope ( "prototype" )
@Slf4j
public class SendBigDataTask {
/ * *
* 未 发 送 的 状 态
* /
private static final int NON_FINISH_SEND_STATATUS = 0 ;
/ * *
* 每 次 发 送 条 数 限 制
@ -94,6 +95,7 @@ public class SendBigDataTask {
}
private void runTask ( BuildRecord resource , SendBigDataDTO sendBigDataDTO ) {
// 根据发送任务的Id来读取发送号码表
Integer id = resource . getId ( ) ;
log . info ( "id: {} " , id ) ;
@ -102,56 +104,38 @@ public class SendBigDataTask {
return ;
}
BuildRecordDto buildRecordDto = buildRecordService . findById ( id ) ;
buildRecordDto . setTaskBuildId ( resource . getTaskBuildId ( ) ) ;
// 查询记录表,返回这个id去发送记录表中进行查找 - 构建发送体
TagQueryCriteria tagQueryCriteria = new TagQueryCriteria ( ) ;
tagQueryCriteria . setTaskId ( Long . valueOf ( id ) ) ;
tagQueryCriteria . setPushStatus ( NON_FINISH_SEND_STATATUS ) ;
// 加入分页防止查不动的情况 这里可以分页进行查询 - 目前不进行优化 - 为全查
Pageable pageable = PageRequest . of ( 0 , sendBigDataDTO . getLimit ( ) . intValue ( ) ) ;
List < Tag > backContent = tagService . queryAllBySlice ( tagQueryCriteria , pageable ) ;
// 遍历查询等待发送的列表
List < Tag > collect = backContent . stream ( )
. distinct ( )
. collect ( Collectors . toList ( ) ) ;
// 进行去重
String resultFilePath = null ;
String onlyName = sendBigDataDTO . getOnlyName ( ) ;
if ( StringUtils . isNotBlank ( onlyName ) ) {
TaskRecordQueryCriteria queryCriteria = new TaskRecordQueryCriteria ( ) ;
queryCriteria . setTaskName ( onlyName ) ;
List < TaskRecordDto > taskRecordDtos = taskRecordService . queryAll ( queryCriteria ) ;
if ( CollectionUtil . isNotEmpty ( taskRecordDtos ) & & taskRecordDtos . size ( ) = = 1 ) {
resultFilePath = taskRecordDtos . get ( 0 ) . getLocalFilePath ( ) ;
}
}
// 通过是否含有 BuildRecordDto-params 来区分来源
List < Tag > collect ;
/ * *
*
* modify by x , 2020 - 12 - 15
*
* 优 化 详 细 见
*
* @see me . zhengjie . StreamTest # testFilter ( )
* /
if ( StringUtils . isNotBlank ( resultFilePath ) ) {
Set < String > fileLines = new HashSet < > ( 50_0000 ) ;
try {
fileLines = new HashSet < > ( Files . readAllLines ( Paths . get ( resultFilePath ) ) ) ;
} catch ( IOException e ) {
log . error ( "================== {read file error , please check is , file path is : {} } ================================" , resultFilePath , e ) ;
}
// 过滤的集合
Set < String > filterCollect = collect . stream ( ) . parallel ( ) . map ( Tag : : getUid ) . filter ( fileLines : : contains ) . collect ( Collectors . toSet ( ) ) ;
if ( CollectionUtil . isNotEmpty ( filterCollect ) ) {
log . info ( "================== [Filter collect is ready, collect size is {} ] ================================" , filterCollect . size ( ) ) ;
collect = collect . stream ( ) . filter ( one - > ! filterCollect . contains ( one . getUid ( ) ) ) . collect ( Collectors . toList ( ) ) ;
}
List < String > remoteCollect ;
// 获取需要进行去重的任务路径
String resultFilePath = getPreFilterPath ( sendBigDataDTO ) ;
if ( StringUtils . isNotBlank ( buildRecordDto . getParams ( ) ) ) {
// 本地表库中的记录的源
collect = getTagsFromLocalDBRec ( resource , sendBigDataDTO , buildRecordDto ) ;
// 进行去重逻辑
collect = getNotDuplicateContent ( collect , resultFilePath ) ;
// 批量发送并且更新记录内容
// 对需要发送的字段进行发送
AtomicLong atomicLong = batchSend ( collect , sendBigDataDTO , buildRecordDto ) ;
finishSendThenUpdateRec ( resource , buildRecordDto , atomicLong ) ;
} else {
// 远程文件存储形式的源
remoteCollect = getRemoteRecFromLocal ( buildRecordDto ) ;
// 进行去重逻辑
remoteCollect = getNotDuplicateRemoteContent ( remoteCollect , resultFilePath ) ;
// 批量发送信息并且更新记录
AtomicLong atomicLong = batchSendRemote ( remoteCollect , sendBigDataDTO , buildRecordDto ) ;
finishSendThenUpdateRec ( resource , buildRecordDto , atomicLong ) ;
}
// 乱序
// 乱序 ( 乱序的逻辑写在SQL语句中)
// Collections.shuffle(collect);
// 对需要发送的字段进行发送
AtomicLong atomicLong = batchSend ( collect , sendBigDataDTO , buildRecordDto ) ;
}
private void finishSendThenUpdateRec ( BuildRecord resource , BuildRecordDto buildRecordDto , AtomicLong atomicLong ) {
// 对发送后的状态进行更新
buildRecordDto . setIsSend ( FINISH_SEND_TAG ) ;
BuildRecord buildRecord = new BuildRecord ( ) ;
@ -160,7 +144,7 @@ public class SendBigDataTask {
// 之前要进行校验
Long dbCount = buildRecordDto . getSendTotal ( ) ;
// 这个数字会不准确和最开始的记录可能有冲突,这边加一个统计校验
Long sendCountFromDB = tagService . countSendSum ( Long . valueOf ( id ) , FINISH_SEND_TAG ) ;
Long sendCountFromDB = tagService . countSendSum ( Long . valueOf ( resource. getId ( ) ) , FINISH_SEND_TAG ) ;
if ( dbCount = = null ) {
dbCount = 0 L ;
}
@ -180,6 +164,87 @@ public class SendBigDataTask {
}
}
private List < String > getNotDuplicateRemoteContent ( List < String > remoteCollect , String resultFilePath ) {
if ( StringUtils . isNotBlank ( resultFilePath ) ) {
Set < String > fileLines = new HashSet < > ( 50_0000 ) ;
try {
fileLines = new HashSet < > ( Files . readAllLines ( Paths . get ( resultFilePath ) ) ) ;
} catch ( IOException e ) {
log . error ( "================== {read file error , please check is , file path is : {} } ================================" , resultFilePath , e ) ;
}
// 过滤的集合
Set < String > filterCollect = remoteCollect . stream ( ) . parallel ( ) . filter ( fileLines : : contains ) . collect ( Collectors . toSet ( ) ) ;
if ( CollectionUtil . isNotEmpty ( filterCollect ) ) {
log . info ( "================== [Filter collect is ready, collect size is {} ] ================================" , filterCollect . size ( ) ) ;
remoteCollect = remoteCollect . stream ( ) . filter ( one - > ! filterCollect . contains ( one ) ) . collect ( Collectors . toList ( ) ) ;
}
}
return remoteCollect ;
}
/ * *
* 进 行 去 重
* /
private List < Tag > getNotDuplicateContent ( List < Tag > collect , String resultFilePath ) {
/ * *
* modify by x , 2020 - 12 - 15
* 优 化 详 细 见
* @see me . zhengjie . StreamTest # testFilter ( )
* /
if ( StringUtils . isNotBlank ( resultFilePath ) ) {
Set < String > fileLines = new HashSet < > ( 50_0000 ) ;
try {
fileLines = new HashSet < > ( Files . readAllLines ( Paths . get ( resultFilePath ) ) ) ;
} catch ( IOException e ) {
log . error ( "================== {read file error , please check is , file path is : {} } ================================" , resultFilePath , e ) ;
}
// 过滤的集合
Set < String > filterCollect = collect . stream ( ) . parallel ( ) . map ( Tag : : getUid ) . filter ( fileLines : : contains ) . collect ( Collectors . toSet ( ) ) ;
if ( CollectionUtil . isNotEmpty ( filterCollect ) ) {
log . info ( "================== [Filter collect is ready, collect size is {} ] ================================" , filterCollect . size ( ) ) ;
collect = collect . stream ( ) . filter ( one - > ! filterCollect . contains ( one . getUid ( ) ) ) . collect ( Collectors . toList ( ) ) ;
}
}
return collect ;
}
private String getPreFilterPath ( SendBigDataDTO sendBigDataDTO ) {
String resultFilePath = null ;
String onlyName = sendBigDataDTO . getOnlyName ( ) ;
if ( StringUtils . isNotBlank ( onlyName ) ) {
TaskRecordQueryCriteria queryCriteria = new TaskRecordQueryCriteria ( ) ;
queryCriteria . setTaskName ( onlyName ) ;
List < TaskRecordDto > taskRecordDtos = taskRecordService . queryAll ( queryCriteria ) ;
if ( CollectionUtil . isNotEmpty ( taskRecordDtos ) & & taskRecordDtos . size ( ) = = 1 ) {
resultFilePath = taskRecordDtos . get ( 0 ) . getLocalFilePath ( ) ;
}
}
return resultFilePath ;
}
private List < String > getRemoteRecFromLocal ( BuildRecordDto buildRecordDto ) {
String localFilePath = buildRecordDto . getLocalFilePath ( ) ;
if ( StringUtils . isBlank ( localFilePath ) ) {
log . error ( "SendBigDataTask|getRemoteRecFromLocal , localFilePath is null!" ) ;
}
return FileUtil . readLines ( localFilePath , Charset . defaultCharset ( ) ) ;
}
private List < Tag > getTagsFromLocalDBRec ( BuildRecord resource , SendBigDataDTO sendBigDataDTO , BuildRecordDto buildRecordDto ) {
buildRecordDto . setTaskBuildId ( resource . getTaskBuildId ( ) ) ;
// 查询记录表,返回这个id去发送记录表中进行查找 - 构建发送体
TagQueryCriteria tagQueryCriteria = new TagQueryCriteria ( ) ;
tagQueryCriteria . setTaskId ( Long . valueOf ( resource . getId ( ) ) ) ;
tagQueryCriteria . setPushStatus ( NON_FINISH_SEND_STATATUS ) ;
// 加入分页防止查不动的情况 这里可以分页进行查询 - 目前不进行优化 - 为全查
Pageable pageable = PageRequest . of ( 0 , sendBigDataDTO . getLimit ( ) . intValue ( ) ) ;
List < Tag > backContent = tagService . queryAllBySlice ( tagQueryCriteria , pageable ) ;
// 遍历查询等待发送的列表
return backContent . stream ( )
. distinct ( )
. collect ( Collectors . toList ( ) ) ;
}
private AtomicLong batchSend ( List < Tag > collect , SendBigDataDTO sendRecordDTO , BuildRecordDto buildRecordDto ) {
AtomicLong successCount = new AtomicLong ( 0 L ) ;
List < List < Tag > > partition = Lists . partition ( collect , SEND_LIMIT ) ;
@ -202,7 +267,7 @@ public class SendBigDataTask {
// 进行Json解析
String jsonStr = JSON . toJSONString ( pushDBJsonContent ) ;
if ( StringUtils . isNotBlank ( jsonStr ) ) {
log . info ( "============ [ Pre send Json is : {} ] ============", jsonStr ) ;
log . info ( "============ [ batchSend Pre send Json is : {} ] ============", jsonStr ) ;
int count = 1 ;
// 失败重发请求3次
String address = "" ;
@ -240,6 +305,59 @@ public class SendBigDataTask {
return successCount ;
}
private AtomicLong batchSendRemote ( List < String > collect , SendBigDataDTO sendRecordDTO , BuildRecordDto buildRecordDto ) {
AtomicLong successCount = new AtomicLong ( 0 L ) ;
List < List < String > > partition = Lists . partition ( collect , SEND_LIMIT ) ;
partition . forEach (
list - > {
// 调用推送地址进行推送
PushDBJsonContent pushDBJsonContent = new PushDBJsonContent ( ) ;
pushDBJsonContent . setActId ( buildRecordDto . getTaskBuildId ( ) ) ;
pushDBJsonContent . setActName ( sendRecordDTO . getSendName ( ) ) ;
// 加入每一个号码对应接口字段
List < PushDBJsonContent . Client > clientList = new ArrayList < > ( ) ;
list . forEach (
each - > {
PushDBJsonContent . Client client = new PushDBJsonContent . Client ( ) ;
client . setCellphone ( each ) ;
clientList . add ( client ) ;
}
) ;
pushDBJsonContent . setClientList ( clientList ) ;
// 进行Json解析
String jsonStr = JSON . toJSONString ( pushDBJsonContent ) ;
if ( StringUtils . isNotBlank ( jsonStr ) ) {
log . info ( "============ [ batchSendRemote Pre send Json is : {} ] ============" , jsonStr ) ;
int count = 1 ;
// 失败重发请求3次
String address = "" ;
while ( count < = 3 ) {
// 对发送请求地址进行准备
String addressTag = sendRecordDTO . getAddressTag ( ) ;
if ( StringUtils . isNotBlank ( addressTag ) ) {
address = preSendReqAddress ( addressTag ) ;
log . info ( "========== [DB request address is {} ] =========" , address ) ;
}
// 调用HTTP请求发送数据
HttpResponse httpResponse = HttpUtil . sendPostReq ( address , jsonStr ) ;
if ( httpResponse . isOk ( ) & & httpResponse . body ( ) . contains ( "true" ) ) {
log . info ( "========== [DB request success, response is {} ] ==========" , httpResponse . body ( ) ) ;
successCount . addAndGet ( list . size ( ) ) ;
break ;
} else {
count + + ;
log . error ( "========== [DB request fail, response is {} ] ==========" , httpResponse . body ( ) ) ;
}
}
if ( count > 3 ) {
log . error ( "========== [DB update send status fail, url is {} ] ==========" , address ) ;
}
}
}
) ;
return successCount ;
}
private String preSendReqAddress ( String tag ) {
StringBuilder builder = new StringBuilder ( ) ;
// 处理环境选择问题, 只接受单个 a, b, c 中的任意一个输入,如果不输入就进行随机返回