一次业务的批量数据任务的处理优化

07-14 1312阅读

文章目录

  • 一次业务的批量数据任务的处理优化
    • 业务背景
    • 1.0版本 分批处理模式
    • 2.0版本 平衡任务队列模式
    • 3.0版本 优化调度平衡任务队列模式
    • 总结

      一次业务的批量数据任务的处理优化

      业务背景

      一个重新生成所有客户的财务业务指标数据的批量数据处理任务。

      一次业务的批量数据任务的处理优化
      (图片来源网络,侵删)

      1.0版本 分批处理模式

      根据要处理的客户数量,按照最大线程数切分成多个段,尽量保证每个线程处理相同的客户数量。

          private void updateForRegenerateByCustomer(List customerIdList,
              SystemUserCommonDTO user, LocalDateTime now) {
              List baseInfoList = CollectionUtils.isEmpty(customerIdList)?customerInfoService.listAll():
                  customerInfoService.listByIdList(customerIdList);
              //先清理客户的数据
              updateForCleanByCustomerIdList(baseInfoList,user,now);
              int maxSize = baseInfoList.size();
              //计算当前任务数量
              int currentMaxPoolSize = maxPoolSize
                  final int begin = i * size;
                  final int end = i==currentMaxPoolSize-1?maxSize:(i+1)*size;
                  //创建异步处理的分段任务
                  tasks[i] = CompletableFuture.runAsync(
                      ()-updateForGenerateByCustomerIdList(baseInfoList,begin,end,user,now)
                      ,executorService)
                      .whenCompleteAsync((k,v)- log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName()));
              }
              // 向线程池提交任务
              CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成",tasks.length)).join();
          }
      	/**
      	  * 生成指定客户列表的所有数据
      	  **/
          private void updateForGenerateByCustomerIdList(List baseInfoList,int begin,int end,
              SystemUserCommonDTO user, LocalDateTime now){
              //每个线程只处理自己的分段的数据
              for(int i=begin;i
                  CustomerBaseInfo baseInfo = baseInfoList.get(i);
                  //每个客户独立事务
                  TransactionalUtils.runWithNewTransactional(
                      ()-updateForGenerateByCustomerId(baseInfo.getId(),user,now));
              }
          }
      	/**
      	* 生成指定客户的所有数据
      	**/
      	private void updateForGenerateByCustomerId(Integer customerId,SystemUserCommonDTO user, LocalDateTime now){
              //1、重新生成客户的所有业务类型的数据
              List maintainDtoList =
                  financeBiBusinessTypeSupport.getMaintainListByCustomerId(customerId);
              if(CollectionUtils.isEmpty(maintainDtoList)){
                  return ;
              }
              //生成每个指标的数据
              Map indicatorMaintainDtoMap = maintainDtoList
                  .stream().collect(Collectors.groupingBy(FinanceBiMaintainDto::getIndicator));
              indicatorMaintainDtoMap.forEach((k,v)->{
                  log.info("重新生成财务业务指标指定客户【{}】的【{}】支持处理开始",customerId,k);
                  financeBiManager.updateForBiMaintain(k, v,user,now);
              });
          }
      

      运行耗时:1420.145秒

      2.0版本 平衡任务队列模式

      1.0 版本 由于不同客户的数据量不同,导致生成数据的耗时不同,因此按照客户数量均分任务的的方式对于每个线程来说,任务量是不一样的,因此可能会导致部分线程太忙,部分线程太空的情况。因此调整为使用队列方式来解决任务分配的问题,每个线程自己取队列中取要处理的客户,直到所有队列中的客户都被处理完,所有的线程结束。这样就避免的线程任务量不平衡问题。

      updateForGenerateByCustomerId 方法不需要改造,只需要调整任务分配的相关方法就可以。

      private void updateForRegenerateByCustomer(List customerIdList, SystemUserCommonDTO user,
              LocalDateTime now) {
              List baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :
                  customerInfoService.listByIdList(customerIdList);
              //先清理客户的数据
              updateForCleanByCustomerIdList(baseInfoList, user, now);
              int maxSize = baseInfoList.size();
              int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);
              //根据线程数,构建固定的任务数量
              CompletableFuture[] tasks = new CompletableFuture[currentMaxPoolSize];
      		//构建待处理的客户队列,由于这里没有并发读写的情况,因此用ConcurrentLinkedQueue效率会更高一点。
              ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(
                  baseInfoList.stream().map(CustomerBaseInfo::getId).collect(Collectors.toList()));
      		//创建多个线程去消耗客户队列
              for (int i = 0; i  updateForGenerateByCustomerIdList(queue, user, now), executorService)
                          .whenCompleteAsync((k, v) -> {
                              if (v != null) {
                                  log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常",
                                      Thread.currentThread().getName()), v);
                              } else {
                                  log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",
                                      Thread.currentThread().getName());
                              }
                          });
              }
              // 向线程池提交任务
              CompletableFuture.allOf(tasks)
                  .whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length))
                  .join();
          }
      	/**
      	  * 生成指定客户列表的所有数据
      	  **/
          private void updateForGenerateByCustomerIdList(ConcurrentLinkedQueue queue, SystemUserCommonDTO user,
              LocalDateTime now) {
              Integer customerId = queue.poll();
              //循环从客户队列中取出待处理的客户,直到所有客户都处理完毕。
              while (customerId != null) {
                  final Integer currentCustomerId = customerId;
                  TransactionalUtils.runWithNewTransactional(
                      () -> updateForGenerateByCustomerId(currentCustomerId, user, now));
                  customerId = queue.poll();
              }
          }
          
      

      优化后的耗时:1037.059秒

      3.0版本 优化调度平衡任务队列模式

      2.0版本虽然解决的了每个线程任务量不平衡的问题,但可能出现某个数据量很大的客户在队列的尾部,导致当其他线程都处理完所有的客户时,取到最大数据量的客户的线程仍在运行,任务整体的耗时被增加。因此需要优化调度,将耗时高的客户调度到队列头部,保证耗时最长的客户的优先处理,从而避免最后等待耗时长的线程。

      updateForGenerateByCustomerIdList 方法不需要改造,只需要队列构造处理就可以。

      private void updateForRegenerateByCustomer(List customerIdList, SystemUserCommonDTO user,
              LocalDateTime now) {
              List baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :
                  customerInfoService.listByIdList(customerIdList);
              //先清理客户的数据
              updateForCleanByCustomerIdList(baseInfoList, user, now);
      		//获取客户的统计数据
              Map customerStatisticsInfoMap =
                  customerStatisticsInfoService.listAll().stream()
                      .collect(Collectors.toMap(CustomerStatisticsInfo::getCustomerId, Function.identity()));
              int maxSize = baseInfoList.size();
              int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);
              CompletableFuture[] tasks = new CompletableFuture[currentMaxPoolSize];
      		
      		//根据客户的统计数据,构建待处理的客户队列
              ConcurrentLinkedQueue queue =
                  baseInfoList.stream().map(item -> customerStatisticsInfoMap.get(item.getId())).filter(Objects::nonNull)
      //队列按照客户数据量倒序排列  
                   .sorted(Comparator.comparing(CustomerStatisticsInfo::getNumberOfCheckedSatisfactoryActivitys,
                          Comparator.reverseOrder())).map(CustomerStatisticsInfo::getCustomerId)
                      .collect(Collectors.toCollection(ConcurrentLinkedQueue::new));
              for (int i = 0; i  {
                      updateForGenerateByCustomerIdList(queue, user, now);
                      return Thread.currentThread().getName();
                  }, executorService).whenCompleteAsync((k, ex) -> {
                      if (ex != null) {
                          log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常", k), ex);
                      } else {
                          log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成", k);
                      }
                  });
              }
              // 向线程池提交任务
              CompletableFuture.allOf(tasks)
                  .whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length))
                  .join();
          }
      

      耗时:726.725秒

      总结

      最终的耗时从1400多秒 降低到700多秒。降低了一半左右。

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]