
在数据密集型行业中,数据倾斜是一个常见且复杂的问题,其通常表现为某些键值对的分布极不平衡,例如表中的某个键值对应的记录量远超过其他键时,会导致连接(Join)操作中个别节点的处理时间过长,从而成为整个查询过程的瓶颈,影响整个系统的处理效率和性能。
面对数据倾斜问题,传统的方法主要依赖于 MapJoin 或手动改写 SQL 查询语句,然而,这两种方法都有其局限性:
-
MapJoin:要求至少有一侧的表足够小,能够将其加载进内存,过大的表可能引发内存溢出。
-
改写 SQL:需要数据库管理员或数据工程师深入理解业务场景,不仅耗时且容易出错,常常需要反复调试。
面对这一挑战,ArgoDB 推出了 Skew Join 功能,旨在自动化处理数据倾斜问题,优化 Join 操作的性能。与传统方法相比,Skew Join 设置完成后不再需要用户介入,也不限于小表的连接优化,它能够自动处理大表之间的连接操作,有效减少查询响应时间。
适用场景
-
大表与大表的连接操作:当两个较大的表进行连接操作时,如果连接键的分布不均匀,会造成数据倾斜。
-
小表与大表的连接操作:即使是小表与大表的连接,如果大表中某些键值的数据量过大,也可能产生倾斜。
-
键值分布极不均匀:当某些键值对应的记录数量远远超过其他键值时,常规的连接操作会导致数据倾斜。
工作原理

Skew Join 针对倾斜的键值采取特殊处理,将倾斜键值的数据分散到多个节点上进行处理,从而避免单个节点上的负载过重。它的基本步骤包括:
操作 | 说明 |
---|---|
自动处理倾斜优化 |
为表设置倾斜键值并开启自动识别后,无需每次都在 SQL 语句中加入 Hint 信息,可自动识别 SQL 并在执行时采用倾斜优化策略。 |
智能数据分发 |
对倾斜键值进行均匀分发,使其分布到多个计算节点上,非倾斜的键则正常执行 Shuffle 操作。 |
优化连接操作 |
并行处理倾斜键值的数据,并且优化执行计划来减轻单个节点的负载。 |
合并结果 |
将处理后的结果合并,以完成整个连接操作。 |
使用限制
-
仅支持连接中的单张表存在数据倾斜的情况,例如:
-
Inner Join 左右任意一侧倾斜
-
Left Join 的左侧表倾斜
-
Right Join 的右侧表倾斜(实质与 Left Join 左表倾斜等价)
-
-
连接条件需为等值连接,即 SQL 语句中 Join On 条件中存在形如 keyA = keyB 或类似条件,此时 keyA、keyB 就是 join key。
-
对于子查询下的数据倾斜场景,需通过手动指定 Hint 方法来执行 Skew Join 优化策略,暂不支持自动识别。
使用案例
-
登录至 Transwarp Manager 平台。
-
修改 Quark 服务配置。
本方法设置后的参数生效级别为全局,如仅需 Session 级生效,您可以在 SQL 命令行中,通过
SET
命令来完成配置,例如SET quark.skew.join.auto.optimize = true
。-
在页面左侧,单击仪表盘 > 集群,找到并单击 Quark 服务。
-
单击配置页签,然后单击页面右侧的添加自定义参数。
-
在弹出的对话框中,根据下述说明完成参数配置,然后单击确定。
-
配置项:填写为 quark.skew.join.auto.optimize,即开启 Skew Join 优化的自动识别。
-
值:填写为 true。
-
配置文件:选择为 hive-site.xml。
-
-
参考上一步骤,依次添加下述参数,将其值设置为 true,配置文件选择为 hive-site.xml。
-
quark.skewjoin.hint.enable:开启使用 Hint 语法手动指定倾斜键值信息。
-
quark.join.null.optimize:开启 Null 值倾斜优化。
-
-
单击页面右上角的配置服务,在弹出的对话框中单击确定。
-
等待配置应用完成后,单击重启。
重启该服务可能导致业务短暂不可用,请在业务低峰期或运维窗口期操作。
-
-
确认表中的键值是否存在数据倾斜。
下文以 store_returns 表为例,查看 sr_reason_sk 列是否存在值倾斜,您可以替换下述 SQL 中的表名和列名为真实环境中的名称。
WITH partition_percentages AS ( SELECT sr_reason_sk, COUNT(*) * 100.0 / SUM(COUNT(*)) OVER() AS percentage FROM store_returns GROUP BY sr_reason_sk ) SELECT sr_reason_sk, percentage, 'top' AS distribution_type FROM partition_percentages ORDER BY percentage DESC LIMIT 5 UNION ALL SELECT sr_reason_sk, percentage, 'bottom' AS distribution_type FROM partition_percentages ORDER BY percentage ASC LIMIT 5;
复制返回结果如下,可以看到 sr_reason_sk 列中 15 的值占比约 68.42%,键值倾斜非常严重:
+---------------+-----------------------+--------------------+ | sr_reason_sk | percentage | distribution_type | +---------------+-----------------------+--------------------+ | 15 | 68.432423177848438 | top | | 12 | 12.27944843355184288 | top | | 19 | 3.51045386920265545 | top | | NULL | 3.5016363906611824 | top | | 11 | 1.75668204429367795 | top | | 16 | 1.75047265018885171 | bottom | | 20 | 1.75147976668236379 | bottom | | 18 | 1.75281680064788846 | bottom | | 13 | 1.75372667830754421 | bottom | | 8 | 1.75445249674597188 | bottom | +---------------+-----------------------+--------------------+
复制 -
为表增加倾斜的键值信息,设置完成后,ArgoDB 在执行 SQL 语句时会自动应用倾斜优化的策略。
ALTER TABLE <table_name> SKEWED BY (<column_name1>, [column_name2], ...) ON ( (<skew_value_1>, <skew_value_2>), (<skew_value_1>, <skew_value_2>), ... );
复制参数说明:
参数 说明 table_name
存在数据倾斜的表名称。
column_name1,column_name2 …
列名,多个列之间用英文逗号(,)分隔。
skew_value_1,skew_value_2 …
倾斜的值,多个值之间用英文逗号(,)分隔,如果指定了多组倾斜值,则每组值需和指定的列顺序对应。
使用示例:
下述案例中,我们为设置 store_returns 表设置了两个倾斜列(sr_reason_sk 和 sr_item_sk),它们对应的倾斜值分别是
10, 15
和4370, 6601
。ALTER TABLE store_returns SKEWED BY (sr_reason_sk, sr_item_sk) ON ((10, 15), (4370, 6601));
复制 -
(可选)管理表的倾斜键值信息。
-
查看表的 Skew Join 设置
DESC FORMATTED <table_name>;
复制在查询输出的结果中,通过
Skewed Columns
和Skewed Values
的值来查看。 -
删除表的 Skew Join 设置
ALTER TABLE <table_name> NOT SKEWED;
复制
-
-
除通过设置表倾斜的键值信息,您还可以通过 Hint 方法,在执行 SQL 时手动指定倾斜的键值信息,具体语法如下。
SELECT /*+SKEWJOIN(<table1_alias> (<column_name>) [(<skew_value>)], <table2_alias> (<column_name>) [(<skew_value>)]...) */ ...;
复制参数 说明 table1_alias
存在数据倾斜的表的别名
column_name
表中存在数据倾斜的列名,必须包含所有用作 Join 条件的列,即 SQL 中 Join 语句的 ON 条件中匹配的列名,多个列名间用英文逗号(,)分隔
skew_value
倾斜的键值,倾斜值内部不同列的值用英文逗号(,)分隔,多个倾斜值整体用方括号包裹,例如
[('name1', 100), ('name2', 200)]
。此外,每个倾斜值必须与 column_name 中倾斜列的顺序对应,且不允许出现 null。
使用示例:
下述案例中,我们设置了 Hint 信息,ArgoDB 在执行 SQL 时,会对 store_returns 表(别名为 sr)中,所有 sr_reason_sk 值为 15 的数据做均匀分发,避免数据倾斜。
SELECT /*+SKEWJOIN(sr (sr_reason_sk) [(15)])*/ sr.sr_item_sk, r.r_reason_desc, sr.sr_returned_date_sk, SUM(sr.sr_return_quantity) AS item_returns, AVG(sr.sr_return_amt) AS avg_return_amt, COUNT(DISTINCT sr.sr_ticket_number) AS unique_tickets, STDDEV(sr.sr_return_amt) AS std_dev_return_amt, MIN(sr.sr_return_amt) AS min_return_amt, MAX(sr.sr_return_amt) AS max_return_amt FROM store_returns sr JOIN reason r ON sr.sr_reason_sk = r.r_reason_sk WHERE sr.sr_reason_sk >= 15 and sr.sr_reason_sk < 16 GROUP BY sr.sr_item_sk, r.r_reason_desc, sr.sr_returned_date_sk ORDER BY item_returns DESC, avg_return_amt DESC LIMIT 10;
复制 -
(可选)确认 SQL 执行时正确地采用了 Skew Join 优化策略。
-
通过 EXPLAIN 判断
在 SQL 语句前加入 EXPLAIN 命令查看执行计划详情,在返回的结果中找到
handleSkewJoin :true
,即表示采用了 Skew Join 优化策略。 -
通过 DBA Service 中的 DAG 图判断
在 DBA Service 平台的查询页面中,找到执行完毕的查询,查看其 DAG 图,示例如下:
图 37. DAG 图示例
-