一次业务的批量数据任务的处理优化
文章目录
- 一次业务的批量数据任务的处理优化
- 业务背景
- 1.0版本 分批处理模式
- 2.0版本 平衡任务队列模式
- 3.0版本 优化调度平衡任务队列模式
- 总结
一次业务的批量数据任务的处理优化
业务背景
一个重新生成所有客户的财务业务指标数据的批量数据处理任务。
(图片来源网络,侵删)1.0版本 分批处理模式
根据要处理的客户数量,按照最大线程数切分成多个段,尽量保证每个线程处理相同的客户数量。
private void updateForRegenerateByCustomer(List customerIdList, SystemUserCommonDTO user, LocalDateTime now) { List baseInfoList = CollectionUtils.isEmpty(customerIdList)?customerInfoService.listAll(): customerInfoService.listByIdList(customerIdList); //先清理客户的数据 updateForCleanByCustomerIdList(baseInfoList,user,now); int maxSize = baseInfoList.size(); //计算当前任务数量 int currentMaxPoolSize = maxPoolSize final int begin = i * size; final int end = i==currentMaxPoolSize-1?maxSize:(i+1)*size; //创建异步处理的分段任务 tasks[i] = CompletableFuture.runAsync( ()-updateForGenerateByCustomerIdList(baseInfoList,begin,end,user,now) ,executorService) .whenCompleteAsync((k,v)- log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName())); } // 向线程池提交任务 CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成",tasks.length)).join(); } /** * 生成指定客户列表的所有数据 **/ private void updateForGenerateByCustomerIdList(List baseInfoList,int begin,int end, SystemUserCommonDTO user, LocalDateTime now){ //每个线程只处理自己的分段的数据 for(int i=begin;i CustomerBaseInfo baseInfo = baseInfoList.get(i); //每个客户独立事务 TransactionalUtils.runWithNewTransactional( ()-updateForGenerateByCustomerId(baseInfo.getId(),user,now)); } } /** * 生成指定客户的所有数据 **/ private void updateForGenerateByCustomerId(Integer customerId,SystemUserCommonDTO user, LocalDateTime now){ //1、重新生成客户的所有业务类型的数据 List maintainDtoList = financeBiBusinessTypeSupport.getMaintainListByCustomerId(customerId); if(CollectionUtils.isEmpty(maintainDtoList)){ return ; } //生成每个指标的数据 Map indicatorMaintainDtoMap = maintainDtoList .stream().collect(Collectors.groupingBy(FinanceBiMaintainDto::getIndicator)); indicatorMaintainDtoMap.forEach((k,v)->{ log.info("重新生成财务业务指标指定客户【{}】的【{}】支持处理开始",customerId,k); financeBiManager.updateForBiMaintain(k, v,user,now); }); }
运行耗时:1420.145秒
2.0版本 平衡任务队列模式
1.0 版本 由于不同客户的数据量不同,导致生成数据的耗时不同,因此按照客户数量均分任务的的方式对于每个线程来说,任务量是不一样的,因此可能会导致部分线程太忙,部分线程太空的情况。因此调整为使用队列方式来解决任务分配的问题,每个线程自己取队列中取要处理的客户,直到所有队列中的客户都被处理完,所有的线程结束。这样就避免的线程任务量不平衡问题。
updateForGenerateByCustomerId 方法不需要改造,只需要调整任务分配的相关方法就可以。
private void updateForRegenerateByCustomer(List customerIdList, SystemUserCommonDTO user, LocalDateTime now) { List baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() : customerInfoService.listByIdList(customerIdList); //先清理客户的数据 updateForCleanByCustomerIdList(baseInfoList, user, now); int maxSize = baseInfoList.size(); int currentMaxPoolSize = Math.min(maxPoolSize, maxSize); //根据线程数,构建固定的任务数量 CompletableFuture[] tasks = new CompletableFuture[currentMaxPoolSize]; //构建待处理的客户队列,由于这里没有并发读写的情况,因此用ConcurrentLinkedQueue效率会更高一点。 ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue( baseInfoList.stream().map(CustomerBaseInfo::getId).collect(Collectors.toList())); //创建多个线程去消耗客户队列 for (int i = 0; i updateForGenerateByCustomerIdList(queue, user, now), executorService) .whenCompleteAsync((k, v) -> { if (v != null) { log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常", Thread.currentThread().getName()), v); } else { log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成", Thread.currentThread().getName()); } }); } // 向线程池提交任务 CompletableFuture.allOf(tasks) .whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length)) .join(); } /** * 生成指定客户列表的所有数据 **/ private void updateForGenerateByCustomerIdList(ConcurrentLinkedQueue queue, SystemUserCommonDTO user, LocalDateTime now) { Integer customerId = queue.poll(); //循环从客户队列中取出待处理的客户,直到所有客户都处理完毕。 while (customerId != null) { final Integer currentCustomerId = customerId; TransactionalUtils.runWithNewTransactional( () -> updateForGenerateByCustomerId(currentCustomerId, user, now)); customerId = queue.poll(); } }
优化后的耗时:1037.059秒
3.0版本 优化调度平衡任务队列模式
2.0版本虽然解决的了每个线程任务量不平衡的问题,但可能出现某个数据量很大的客户在队列的尾部,导致当其他线程都处理完所有的客户时,取到最大数据量的客户的线程仍在运行,任务整体的耗时被增加。因此需要优化调度,将耗时高的客户调度到队列头部,保证耗时最长的客户的优先处理,从而避免最后等待耗时长的线程。
updateForGenerateByCustomerIdList 方法不需要改造,只需要队列构造处理就可以。
private void updateForRegenerateByCustomer(List customerIdList, SystemUserCommonDTO user, LocalDateTime now) { List baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() : customerInfoService.listByIdList(customerIdList); //先清理客户的数据 updateForCleanByCustomerIdList(baseInfoList, user, now); //获取客户的统计数据 Map customerStatisticsInfoMap = customerStatisticsInfoService.listAll().stream() .collect(Collectors.toMap(CustomerStatisticsInfo::getCustomerId, Function.identity())); int maxSize = baseInfoList.size(); int currentMaxPoolSize = Math.min(maxPoolSize, maxSize); CompletableFuture[] tasks = new CompletableFuture[currentMaxPoolSize]; //根据客户的统计数据,构建待处理的客户队列 ConcurrentLinkedQueue queue = baseInfoList.stream().map(item -> customerStatisticsInfoMap.get(item.getId())).filter(Objects::nonNull) //队列按照客户数据量倒序排列 .sorted(Comparator.comparing(CustomerStatisticsInfo::getNumberOfCheckedSatisfactoryActivitys, Comparator.reverseOrder())).map(CustomerStatisticsInfo::getCustomerId) .collect(Collectors.toCollection(ConcurrentLinkedQueue::new)); for (int i = 0; i { updateForGenerateByCustomerIdList(queue, user, now); return Thread.currentThread().getName(); }, executorService).whenCompleteAsync((k, ex) -> { if (ex != null) { log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常", k), ex); } else { log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成", k); } }); } // 向线程池提交任务 CompletableFuture.allOf(tasks) .whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length)) .join(); }
耗时:726.725秒
总结
最终的耗时从1400多秒 降低到700多秒。降低了一半左右。
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。