Hive大表join大表如何调优

06-30 1628阅读

目录

  • 一、调优思路
    • 1、SQL优化
      • 1.1 大小表join
      • 1.2 大大表join
      • 2、insert into替换union all
      • 3、排序order by换位sort by
      • 4、并行执行
      • 5、数据倾斜优化
      • 6、小文件优化
      • 二、实战
        • 2.1 场景
        • 2.2 限制所需的字段,间接mapjoin
        • 2.2 解决异常值倾斜,如NULL加随机数打散
        • 2.3 扩容解决数据倾斜
          • 2.3.1 客户表扩大N倍
          • 2.3.2 部分倾斜key扩容,大卖家扩容
          • 2.3.3 推荐:分而治之:倾斜和非倾斜再union all

            在Hive中,优化器会根据统计信息决定是将大表放在前面(Join的左边)还是小表放在前面。通常,优化器会选择数据量较小的表作为驱动表(小表作为左边),因为这样可以减少内存消耗并提高效率。

            但是,如果你有特定的需求,比如你知道大部分数据能快速过滤掉,希望减少任务的执行时间,那么你可以强制指定某个表作为小表。在Hive中,可以使用/*+ MAPJOIN(table_name) */ 注释来强制将一个大表作为小表处理。

            Hive大表join大表如何调优
            (图片来源网络,侵删)

            例如,如果你想要将big_table作为小表:

            SELECT /*+ MAPJOIN(big_table) */
              a.column1, a.column2, b.column1, b.column2
            FROM
              small_table a
            JOIN
              big_table b
            ON
              a.common_column = b.common_column;
            

            一、调优思路

            1、SQL优化

            1.1 大小表join

            1、mapjoin,小表使用mapjoin,或者强制hint

            2、将大表放后头,原因:Hive假定查询中最后的一个表是大表。它会将其它表缓存起来,然后扫描最后那个表。因此通常需要将小表放前面,或者标记哪张表是大表:/*streamtable(table_name) */

            3、过滤无效值:空值、不使用的字段等。

            4、不能过滤的空值,将空值转化为随机数避免数据倾斜。

            1.2 大大表join

            1)创建第二张大表
            create table bigtable2(
                id bigint,
                t bigint,
                uid string,
                keyword string,
                url_rank int,
                click_num int,
                click_url string)
            row format delimited fields terminated by '\t';
            load data local inpath '/opt/module/data/bigtable' into table bigtable2;
            2)测试大表直接JOIN
            insert overwrite table jointable
            select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
            from bigtable a
            join bigtable2 b
            on a.id = b.id;
            测试结果:Time taken: 72.289 seconds
            insert overwrite table jointable
            select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
            from bigtable a
            join bigtable2 b
            on a.id = b.id;
            3)创建分桶表1
            create table bigtable_buck1(
                id bigint,
                t bigint,
                uid string,
                keyword string,
                url_rank int,
                click_num int,
                click_url string)
            clustered by(id)
            sorted by(id)
            into 6 buckets
            row format delimited fields terminated by '\t';
            load data local inpath '/opt/module/data/bigtable' into table bigtable_buck1;
            4)创建分桶表2,分桶数和第一张表的分桶数为倍数关系
            create table bigtable_buck2(
                id bigint,
                t bigint,
                uid string,
                keyword string,
                url_rank int,
                click_num int,
                click_url string)
            clustered by(id)
            sorted by(id)
            into 6 buckets
            row format delimited fields terminated by '\t';
            load data local inpath '/opt/module/data/bigtable' into table bigtable_buck2;
            5)设置参数
            set hive.optimize.bucketmapjoin = true;
            set hive.optimize.bucketmapjoin.sortedmerge = true;
            set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
            6)测试 Time taken: 34.685 seconds
            insert overwrite table jointable
            select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
            from bigtable_buck1 s
            join bigtable_buck2 b
            on b.id = s.id;
            

            1、使用相同的连接键

            • 当对3个或者更多个表进行join连接时,如果每个on子句都使用相同的连接键的话,那么只会产生一个MapReduce job。

              2、过滤无效、未使用的数据:减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段。

              加随机数打散
              1)空值0值 或 关联不上的,用随机数
              from a join b
              on if(a.key=’’, rand(id)%10, a.key)=b.key
              –rand() 0-1之间的小数
              (2)都是有用的key,则加随机数后缀
              group by concat(key, cast(round(rand()*10) as int))
              缺点是分成10份是提前写好的,数据变更大时,还是会跑得慢。
              

              3、逻辑拆分,使用中间表计算

              • 尽量原子化操作:多个表关联时,避免包含复杂逻辑大sql(因为无法控制中间job),最好分拆成小段,可以使用中间表来完成复杂的逻辑
              • 写入HDFS之后:多次INSERT OVERWRITE TABLE写法参考:spark调优-小文件问题

                4、列裁剪,避免使用select * 如果查询的是分区表,一定要记得带上分区条件

                5、where条件写在子查询中:先过滤再关联(最好使用这种笨办法,虽然hive3.0自带谓词下推)

                6、关联条件写在on中,而不是where中

                • 非主表谓词下推情况下,可以理解为where是全部执行完在reduce中进行过滤,on是在关联过程中filter

                  7、数据量小时,用in代替join

                  8、使用semi join替代in/exists

                  inner join和left semi join的联系和区别

                  2、insert into替换union all

                  如果union all的部分个数大于2,或者每个union部分数据量大,应该拆成多个insert into 语句,效率有提升。

                  ?insert into到不同分区?

                  3、排序order by换位sort by

                  order by:对输入做全局排序,因此只有一个reducer(多个reducer无法保证全局有序),只有一个reducer,会导致当输入规模较大时,需要较长的计算时间。

                  sort by:局部排序,保证每个reducer的输出文件是有序的。

                  hive order by,sort by, distribute by, cluster by作用以及用法

                  4、并行执行

                  Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。

                  默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。如果有更多的阶段可以并行执行,那么job可能就越快完成。

                  通过设置参数hive.exec.parallel值为true,就可以开启并发执行。在共享集群中,需要注意下,如果job中并行阶段增多,那么集群利用率就会增加。

                  set hive.exec.parallel=true; //打开任务并行执行
                  set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。
                  set hive.exec.parallel=true
                  

                  5、数据倾斜优化

                  spark-数据倾斜、

                  hadoop-hive-数据倾斜问题

                  6、小文件优化

                  spark调优-小文件问题

                  参考链接

                  HiveSQL大表join大表数据倾斜

                  二、实战

                  添加链接描述

                  2.1 场景

                  【背景】

                  A表为一个汇总表,汇总的是卖家买家最近N天交易汇总信息,即对于每个卖家最近N天,其每个买家共成交了多少单,总金额是多少,假设N取90天,汇总值仅取成交单数。

                  A表的字段有:buyer_id、seller_id、pay_cnt_90day。

                  B表为卖家基本信息表,其字段有seller_id、sale_level,其中sale_levels是卖家的一个分层评级信息,比如吧卖家分为6个级别:S0、S1、S2、S3、S4和S5。

                  要获得的结果是每个买家在各个级别的卖家的成交比例信息,比如:某买家:S0:10%;S1:20%;S2:20%;S3:10%;S4:20%;S5:10%。

                  【初始思路】

                  第一反应是直接join两表并统计:

                  	select
                           m.buyer_id,
                          sum(pay_cnt_90day)  as pay_cnt_90day,
                          sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
                          sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
                          sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
                          sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
                          sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
                          sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
                        from (
                          select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
                          from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a
                          join
                                 (select seller_id,  sale_level  from table_B)  b
                          on  a.seller_id  = b.seller_id
                          )  m
                        group by m.buyer_id
                  

                  但是此SQL会引起数据倾斜,原因在于卖家的二八准则,某些卖家90天内会有几百万甚至上千万的买家,但是大部分的卖家90天内买家的数目并不多,join table_A和table_B的时候,ODPS会按照seller_id进行分发,table_A的大卖家引起了数据倾斜。但是数据本身无法用mapjoin table_B解决,因为卖家超过千万条,文件大小有几个GB,超过了1GB的限制。

                  2.2 限制所需的字段,间接mapjoin

                  思路:只看90天内有交易的卖家,不join全部的卖家表

                  局限:此方案在一些情况可以起作用,但是很多时候还是无法解决上述问题,因为大部分卖家尽管90天内买家不多,但还是有一些的,过滤后的B表仍然很多。

                   select
                           m.buyer_id,
                          sum(pay_cnt_90day)  as pay_cnt_90day,
                          sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
                          sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
                          sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
                          sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
                          sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
                          sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
                        from ( 
                          select  /*+mapjoin(b)*/
                            a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
                          from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a
                          join  (
                  		           select seller_id,  sale_level  from table_B b0
                  		           join
                  		           (select seller_id from table_A group by seller_id) a0
                  		             on b0.seller_id = a0.selller_id
                            )  b
                          on  a.seller_id  = b.seller_id
                          )  m
                        group by m.buyer_id   
                  

                  2.2 解决异常值倾斜,如NULL加随机数打散

                  **思路:**核心是将这些引起倾斜的值随机分发到Reduce,join时对这些特殊值concat随机数,从而达到随机分发的目的。

                  **适用于:**倾斜的值是明确的而且数量很少,比如null值引起的倾斜。

                  **局限:**无法解决本问题场景的倾斜问题,因为倾斜的卖家大量存在而且动态变化。

                  此方案的核心逻辑如下:

                  		select a.user_id, a.order_id, b.user_id
                        from table_a a 
                        join table_b b
                        on (case when a.user_is is null then concat('hive', rand(id)) else a.user_id end) = b.user_id
                  

                  Hive 已对此进行了优化,只需要设置参数skewinfo和skewjoin参数,不修改SQL代码,例如,由于table_B的值“0” 和“1”引起了倾斜,值需要做如下设置:

                  set hive.optimize.skewinfo=table_B:(selleer_id) [ ( "0") ("1") ) ] 
                  set hive.optimize.skewjoin = true;
                  

                  2.3 扩容解决数据倾斜

                  推荐”2.3.3 推荐:分而治之:倾斜和非倾斜再union all“,可直接看。若不行,推荐方案2.3.2倾斜key扩容。

                  2.3.1 客户表扩大N倍

                  思路:按照seller_id分发会倾斜,那么再人工增加一列进行分发,这样之前倾斜的值的倾斜程度会减少到原来的1/10,可以通过配置numbers表改放大倍数来降低倾斜程度。

                  代码实现:建立一个numbers表,其值只有一列int 行,比如从1到10(具体值可根据倾斜程度确定),然后放大B表10倍,再取模join。

                  局限性:数据量翻倍,B表也会膨胀N倍。

                  SELECT m.buyer_id,
                           sum(pay_cnt_90day) AS pay_cnt_90day,
                           sum(case  WHEN m.sale_level = 0 THEN pay_cnt_90day end) AS pay_cnt_90day_s0, 
                           sum(case WHEN m.sale_level = 1 THEN pay_cnt_90day end) AS pay_cnt_90day_s1,
                           sum(case WHEN m.sale_level = 2 THEN  pay_cnt_90day end) AS pay_cnt_90day_s2, 
                       sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
                       sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
                       sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
                  FROM 
                      (SELECT a.buer_id,
                           a.seller_id,
                           b.sale_level,
                           a.pay_cnt_90day
                      FROM 
                          (SELECT buyer_id,
                           seller_id,
                           pay_cnt_90day
                          FROM table_A) a
                          JOIN -- 将B表扩容N倍
                              (SELECT /*+mapjoin(members)*/ seller_id,
                           sale_level ,
                          member
                              FROM table_B
                              JOIN members -- 扩容N倍的表
                               ) b
                            ON a.seller_id = b.seller_id
                                AND mod(a.pay_cnt_90day,10)+1 = b.number ) m
                          GROUP BY  m.buyer_id 
                  

                  2.3.2 部分倾斜key扩容,大卖家扩容

                  思路:把大卖家放大倍数即可:需要首先知道大卖家的名单,即先建立一个临时表动态存放每天最新的大卖家(比如dim_big_seller),同时此表的大卖家要膨胀预先设定的倍数(1000倍)。

                  代码实现:在A表和B表分别新建一个join列,其逻辑为:如果是大卖家,那么concat一个随机分配正整数(0到预定义的倍数之间,本例为0~1000);如果不是,保持不变。

                  局限性: 相比全部数据扩容,仅倾斜指标扩容的运行效率有提升,但代码复杂性高,必须首先建立大数据表。

                  SELECT m.buyer_id,
                           sum(pay_cnt_90day) AS pay_cnt_90day,
                           sum(case  WHEN m.sale_level = 0 THEN pay_cnt_90day end) AS pay_cnt_90day_s0, 
                           sum(case WHEN m.sale_level = 1 THEN pay_cnt_90day end) AS pay_cnt_90day_s1,
                           sum(case WHEN m.sale_level = 2 THEN  pay_cnt_90day end) AS pay_cnt_90day_s2, 
                       sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
                       sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
                       sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
                      pay_cnt_90day end) AS pay_cnt_90day_s5
                  FROM 
                      (SELECT a.buer_id,
                           a.seller_id,
                           b.sale_level,
                           a.pay_cnt_90day
                      FROM 
                          (SELECT /*+mapjoin(big)*/ buyer_id,
                           seller_id,
                           pay_cnt_90day,
                           if(big.seller_id is NOT null,
                           concat( table_A.seller_id,
                           'rnd', cast( rand() * 1000 AS bigint ), table_A.seller_id) AS seller_id_joinkey
                          FROM table_A left outer
                          JOIN --big表seller_id有重复,请注意一定要group by 后再join,保证table_A的行数保持不变 
                          (  SELECT seller_id
                               FROM dim_big_seller
                               GROUP BY  seller_id
                           )big
                              ON table_A.seller_id = big.seller_id ) a
                          JOIN 
                              (SELECT /*+mapjoin(big)*/ seller_id,
                                       sale_level ,
                                    --big表的seller_id_joinkey生成逻辑和上面的生成逻辑一样 coalesce(seller_id_joinkey,
                                       table_B.seller_id) AS seller_id_joinkey
                              FROM table_B 
                              left out JOIN --table_B表join大卖家表后大卖家行数扩大1000倍,其它卖家行数保持不变 
                                  (SELECT seller_id,
                                           seller_id_joinkey
                                  FROM dim_big_seller
                                  ) big
                                      ON table_B.seller_id= big.seller_id ) b
                                      ON a.seller_id_joinkey= b.seller_id_joinkey
                                          AND mod(a.pay_cnt_90day,10)+1 = b.number ) m
                                  GROUP BY  m.buyer_id
                  

                  2.3.3 推荐:分而治之:倾斜和非倾斜再union all

                  思路:对倾斜的键值和不倾斜的键值分开处理,不倾斜的正常join即可,倾斜的把他们找出来做mapjoin,最后union all其结果即可。

                  代码实现:

                  局限性:较麻烦,代码复杂而且需要一个临时表存放倾斜的键值。

                  --1、构建临时表,由于数据倾斜,先找出90天买家超过10000的卖家 
                  insert overwrite table temp_table_B
                  SELECT m.seller_id,
                           n.sale_level
                  FROM 
                      (SELECT seller_id
                      FROM 
                          (SELECT seller_id,
                          count(buyer_id) AS byr_cnt
                          FROM table_A
                          GROUP BY  seller_id ) a
                          WHERE a.byr_cnt >10000 ) m
                      LEFT JOIN 
                      (SELECT seller_id,
                           sale_level
                      FROM table_B 
                      ) n
                      ON m.seller_id = n.seller_id; 
                      
                      
                   --2、分而治之,不倾斜union all 倾斜。
                   --对于90天买家超过10000的卖家直接mapjoin,对其它卖家直接正常join即可
                   。SELECT m.buyer_id,
                           sum(pay_cnt_90day) AS pay_cnt_90day,
                           sum(case  WHEN m.sale_level = 0 THEN pay_cnt_90day end) AS pay_cnt_90day_s0, 
                           sum(case WHEN m.sale_level = 1 THEN pay_cnt_90day end) AS pay_cnt_90day_s1,
                           sum(case WHEN m.sale_level = 2 THEN  pay_cnt_90day end) AS pay_cnt_90day_s2, 
                       sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
                       sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
                       sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
                  FROM 
                      (SELECT a.buer_id,
                           a.seller_id,
                           b.sale_level,
                           a.pay_cnt_90day
                      FROM 
                          (SELECT buyer_id,
                           seller_id,
                           pay_cnt_90day
                          FROM table_A
                          ) a
                          JOIN 
                              (SELECT seller_id,
                                      a.sale_level
                              FROM table_A a
                              LEFT JOIN temp_table_B b
                                  ON a.seller_id = b.seller_id
                              WHERE b.seller_id is  NULL -- 限制为不倾斜的卖家
                              ) b
                                  ON a.seller_id = b.seller_id
                         UNION all
                         SELECT /*+mapjoin(b)*/ a.buer_id,
                           a.seller_id,
                           b.sale_level,
                           a.pay_cnt_90day
                              FROM 
                                  (SELECT buyer_id,
                                           seller_id,
                                           pay_cnt_90day
                                  FROM table_A
                                  ) a
                                  JOIN 
                                      (SELECT seller_id,
                                              sale_level
                                      FROM table_B -- 只看倾斜卖家
                                      ) b
                                 ON a.seller_id = b.seller_id
                                 ) m
                                  GROUP BY  m.buyer_id ) m
                              GROUP BY  m.buyer_id 
                  
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]