@ -12,6 +12,8 @@ import me.zhengjie.modules.buildrecord.domain.BuildRecord;
import me.zhengjie.modules.buildrecord.service.BuildRecordService ;
import me.zhengjie.modules.buildrecord.service.dto.BuildRecordDto ;
import me.zhengjie.modules.buildrecord.task.dto.SendBigDataDTO ;
import me.zhengjie.modules.remoterec.domain.RemoteRecord ;
import me.zhengjie.modules.remoterec.service.RemoteRecordService ;
import me.zhengjie.modules.tag.domain.Tag ;
import me.zhengjie.modules.tag.service.TagService ;
import me.zhengjie.modules.tag.service.dto.TagDto ;
@ -43,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors ;
import static me.zhengjie.modules.common.consts.SendBigDataConst.NON_FINISH_SEND_STATATUS ;
import static me.zhengjie.modules.remoterec.consts.RemoteRecConst.SUCCESS_TAG ;
@Component
@Scope ( "prototype" )
@ -84,17 +87,19 @@ public class SendBigDataTask {
private TagService tagService ;
@Autowired
private TaskRecordService taskRecordService ;
@Autowired
private RemoteRecordService remoteRecordService ;
@Async ( value = "SendBigDataTaskExecutor" )
public void doRunTask ( BuildRecord resource , SendBigDataDTO sendBigDataDTO ) {
public void doRunTask ( BuildRecord resource , RemoteRecord remoteRecord , SendBigDataDTO sendBigDataDTO ) {
Long satrtMilliSecond = LocalDateTime . now ( ) . toInstant ( ZoneOffset . of ( "+8" ) ) . toEpochMilli ( ) ;
log . info ( "====== [ task start running, task name is {} ] ======" , "SendBigDataTask" ) ;
runTask ( resource , sendBigDataDTO) ;
runTask ( resource , remoteRecord, sendBigDataDTO) ;
Long endMilliSecond = LocalDateTime . now ( ) . toInstant ( ZoneOffset . of ( "+8" ) ) . toEpochMilli ( ) ;
log . info ( "====== [ task start end, task name is {},cost milliSecond is {} ] ======" , "SendBigDataTask" , ConvertUtil . secondToTime ( endMilliSecond - satrtMilliSecond ) ) ;
}
private void runTask ( BuildRecord resource , SendBigDataDTO sendBigDataDTO ) {
private void runTask ( BuildRecord resource , RemoteRecord remoteRecord , SendBigDataDTO sendBigDataDTO ) {
// 根据发送任务的Id来读取发送号码表
Integer id = resource . getId ( ) ;
@ -128,7 +133,7 @@ public class SendBigDataTask {
remoteCollect = getNotDuplicateRemoteContent ( remoteCollect , resultFilePath ) ;
// 批量发送信息并且更新记录
AtomicLong atomicLong = batchSendRemote ( remoteCollect , sendBigDataDTO , buildRecordDto ) ;
finishSendThenUpdateRe c( resource , buildRecordDto , atomicLong ) ;
finishSendThenUpdateRe moteRec( remoteRecord , atomicLong ) ;
}
// 乱序 ( 乱序的逻辑写在SQL语句中)
@ -164,6 +169,18 @@ public class SendBigDataTask {
}
}
private void finishSendThenUpdateRemoteRec ( RemoteRecord remoteRecord , AtomicLong atomicLong ) {
// 对发送后的状态进行更新
RemoteRecord remote = new RemoteRecord ( ) ;
BeanUtil . copyProperties ( remoteRecord , remote ) ;
remote . setTag ( SUCCESS_TAG ) ;
remote . setWeight ( 10 ) ;
remote . setSuccessSendCount ( atomicLong . get ( ) ) ;
remoteRecordService . update ( remote ) ;
}
private List < String > getNotDuplicateRemoteContent ( List < String > remoteCollect , String resultFilePath ) {
if ( StringUtils . isNotBlank ( resultFilePath ) ) {
Set < String > fileLines = new HashSet < > ( 50_0000 ) ;