联 系 我 们
售前咨询
售后咨询
微信关注:星环科技服务号
更多联系方式 >
6.7.5 Skew Join
更新时间:12/17/2024, 7:25:16 AM

在数据密集型行业中,数据倾斜是一个常见且复杂的问题,其通常表现为某些键值对的分布极不平衡,例如表中的某个键值对应的记录量远超过其他键时,会导致连接(Join)操作中个别节点的处理时间过长,从而成为整个查询过程的瓶颈,影响整个系统的处理效率和性能。

面对数据倾斜问题,传统的方法主要依赖于 MapJoin 或手动改写 SQL 查询语句,然而,这两种方法都有其局限性:

  • MapJoin:要求至少有一侧的表足够小,能够将其加载进内存,过大的表可能引发内存溢出。

  • 改写 SQL:需要数据库管理员或数据工程师深入理解业务场景,不仅耗时且容易出错,常常需要反复调试。

面对这一挑战,ArgoDB 推出了 Skew Join 功能,旨在自动化处理数据倾斜问题,优化 Join 操作的性能。与传统方法相比,Skew Join 设置完成后不再需要用户介入,也不限于小表的连接优化,它能够自动处理大表之间的连接操作,有效减少查询响应时间。

适用场景
  • 大表与大表的连接操作:当两个较大的表进行连接操作时,如果连接键的分布不均匀,会造成数据倾斜。

  • 小表与大表的连接操作:即使是小表与大表的连接,如果大表中某些键值的数据量过大,也可能产生倾斜。

  • 键值分布极不均匀:当某些键值对应的记录数量远远超过其他键值时,常规的连接操作会导致数据倾斜。

工作原理
skew join principle
图 36. Skew Join 工作原理

Skew Join 针对倾斜的键值采取特殊处理,将倾斜键值的数据分散到多个节点上进行处理,从而避免单个节点上的负载过重。它的基本步骤包括:

表 24. 工作原理
操作 说明

自动处理倾斜优化

为表设置倾斜键值并开启自动识别后,无需每次都在 SQL 语句中加入 Hint 信息,可自动识别 SQL 并在执行时采用倾斜优化策略。

智能数据分发

对倾斜键值进行均匀分发,使其分布到多个计算节点上,非倾斜的键则正常执行 Shuffle 操作。

优化连接操作

并行处理倾斜键值的数据,并且优化执行计划来减轻单个节点的负载。

合并结果

将处理后的结果合并,以完成整个连接操作。

使用限制
  • 仅支持连接中的单张表存在数据倾斜的情况,例如:

    • Inner Join 左右任意一侧倾斜

    • Left Join 的左侧表倾斜

    • Right Join 的右侧表倾斜(实质与 Left Join 左表倾斜等价)

  • 连接条件需为等值连接,即 SQL 语句中 Join On 条件中存在形如 keyA = keyB 或类似条件,此时 keyA、keyB 就是 join key。

  • 对于子查询下的数据倾斜场景,需通过手动指定 Hint 方法来执行 Skew Join 优化策略,暂不支持自动识别。

使用案例
  1. 登录至 Transwarp Manager 平台。

  2. 修改 Quark 服务配置。

    本方法设置后的参数生效级别为全局,如仅需 Session 级生效,您可以在 SQL 命令行中,通过 SET 命令来完成配置,例如 SET quark.skew.join.auto.optimize = true

    1. 在页面左侧,单击仪表盘 > 集群,找到并单击 Quark 服务。

    2. 单击配置页签,然后单击页面右侧的添加自定义参数

    3. 在弹出的对话框中,根据下述说明完成参数配置,然后单击确定

      turn on skewjoin
      • 配置项:填写为 quark.skew.join.auto.optimize,即开启 Skew Join 优化的自动识别。

      • :填写为 true

      • 配置文件:选择为 hive-site.xml

    4. 参考上一步骤,依次添加下述参数,将其值设置为 true,配置文件选择为 hive-site.xml

      • quark.skewjoin.hint.enable:开启使用 Hint 语法手动指定倾斜键值信息。

      • quark.join.null.optimize:开启 Null 值倾斜优化。

    5. 单击页面右上角的配置服务,在弹出的对话框中单击确定

    6. 等待配置应用完成后,单击重启

      重启该服务可能导致业务短暂不可用,请在业务低峰期或运维窗口期操作。

  3. 通过 Beeline 登录 ArgoDB 数据库

  4. 确认表中的键值是否存在数据倾斜。

    下文以 store_returns 表为例,查看 sr_reason_sk 列是否存在值倾斜,您可以替换下述 SQL 中的表名和列名为真实环境中的名称。

    WITH partition_percentages AS (
    SELECT sr_reason_sk, COUNT(*) * 100.0 / SUM(COUNT(*)) OVER() AS percentage
    FROM store_returns
    GROUP BY sr_reason_sk
    )
    SELECT sr_reason_sk, percentage, 'top' AS distribution_type
    FROM partition_percentages
    ORDER BY percentage DESC
    LIMIT 5
    UNION ALL
    SELECT sr_reason_sk, percentage, 'bottom' AS distribution_type
    FROM partition_percentages
    ORDER BY percentage ASC
    LIMIT 5;
    复制

    返回结果如下,可以看到 sr_reason_sk 列中 15 的值占比约 68.42%,键值倾斜非常严重:

    +---------------+-----------------------+--------------------+
    | sr_reason_sk  | percentage            | distribution_type  |
    +---------------+-----------------------+--------------------+
    | 15            | 68.432423177848438    | top                |
    | 12            | 12.27944843355184288  | top                |
    | 19            | 3.51045386920265545   | top                |
    | NULL          | 3.5016363906611824    | top                |
    | 11            | 1.75668204429367795   | top                |
    | 16            | 1.75047265018885171   | bottom             |
    | 20            | 1.75147976668236379   | bottom             |
    | 18            | 1.75281680064788846   | bottom             |
    | 13            | 1.75372667830754421   | bottom             |
    | 8             | 1.75445249674597188   | bottom             |
    +---------------+-----------------------+--------------------+
    复制
  5. 为表增加倾斜的键值信息,设置完成后,ArgoDB 在执行 SQL 语句时会自动应用倾斜优化的策略。

    ALTER TABLE <table_name>
    SKEWED BY (<column_name1>, [column_name2], ...)
    ON (
        (<skew_value_1>, <skew_value_2>),
        (<skew_value_1>, <skew_value_2>),
        ...
    );
    复制

    参数说明

    参数 说明

    table_name

    存在数据倾斜的表名称。

    column_name1,column_name2 …​

    列名,多个列之间用英文逗号(,)分隔。

    skew_value_1,skew_value_2 …​

    倾斜的值,多个值之间用英文逗号(,)分隔,如果指定了多组倾斜值,则每组值需和指定的列顺序对应。

    使用示例

    下述案例中,我们为设置 store_returns 表设置了两个倾斜列(sr_reason_sk 和 sr_item_sk),它们对应的倾斜值分别是 10, 154370, 6601

    ALTER TABLE store_returns
    SKEWED BY (sr_reason_sk, sr_item_sk)
    ON ((10, 15), (4370, 6601));
    复制
  6. (可选)管理表的倾斜键值信息。

    • 查看表的 Skew Join 设置

      DESC FORMATTED <table_name>;
      复制

      在查询输出的结果中,通过 Skewed ColumnsSkewed Values 的值来查看。

    • 删除表的 Skew Join 设置

      ALTER TABLE <table_name> NOT SKEWED;
      复制
  7. 除通过设置表倾斜的键值信息,您还可以通过 Hint 方法,在执行 SQL 时手动指定倾斜的键值信息,具体语法如下。

    SELECT /*+SKEWJOIN(<table1_alias> (<column_name>) [(<skew_value>)],
                       <table2_alias> (<column_name>) [(<skew_value>)]...) */
    ...;
    复制
    参数 说明

    table1_alias

    存在数据倾斜的表的别名

    column_name

    表中存在数据倾斜的列名,必须包含所有用作 Join 条件的列,即 SQL 中 Join 语句的 ON 条件中匹配的列名,多个列名间用英文逗号(,)分隔

    skew_value

    倾斜的键值,倾斜值内部不同列的值用英文逗号(,)分隔,多个倾斜值整体用方括号包裹,例如 [('name1', 100), ('name2', 200)]

    此外,每个倾斜值必须与 column_name 中倾斜列的顺序对应,且不允许出现 null。

    使用示例

    下述案例中,我们设置了 Hint 信息,ArgoDB 在执行 SQL 时,会对 store_returns 表(别名为 sr)中,所有 sr_reason_sk 值为 15 的数据做均匀分发,避免数据倾斜。

    SELECT /*+SKEWJOIN(sr (sr_reason_sk) [(15)])*/
        sr.sr_item_sk,
        r.r_reason_desc,
        sr.sr_returned_date_sk,
        SUM(sr.sr_return_quantity) AS item_returns,
        AVG(sr.sr_return_amt) AS avg_return_amt,
        COUNT(DISTINCT sr.sr_ticket_number) AS unique_tickets,
        STDDEV(sr.sr_return_amt) AS std_dev_return_amt,
        MIN(sr.sr_return_amt) AS min_return_amt,
        MAX(sr.sr_return_amt) AS max_return_amt
    FROM store_returns sr
    JOIN reason r ON sr.sr_reason_sk = r.r_reason_sk
    WHERE sr.sr_reason_sk >= 15 and sr.sr_reason_sk < 16
    GROUP BY sr.sr_item_sk, r.r_reason_desc, sr.sr_returned_date_sk
    ORDER BY item_returns DESC, avg_return_amt DESC
    LIMIT 10;
    复制
  8. (可选)确认 SQL 执行时正确地采用了 Skew Join 优化策略。

    • 通过 EXPLAIN 判断

      在 SQL 语句前加入 EXPLAIN 命令查看执行计划详情,在返回的结果中找到 handleSkewJoin :true,即表示采用了 Skew Join 优化策略。

    • 通过 DBA Service 中的 DAG 图判断

      在 DBA Service 平台的查询页面中,找到执行完毕的查询,查看其 DAG 图,示例如下:

      skew join dag
      图 37. DAG 图示例