JOIN 卡在某个 reducer 上是因为数据倾斜——某键(如用户 ID 为 ’0’ 或空值)在表中出现几百万次,导致全分到同一 task,其他 task 早已完成。

为什么 JOIN 会卡在某个 reducer 上?
因为数据倾斜——某一个 JOIN 键(比如用户 ID 为 '0' 或空值)在左表或右表中出现几百万次,而其他键只出现几次。Spark 或 Hive 执行时,这个键全被分到同一个 task/reducer,其他 task 早跑完了,就它一直 hang 着。
常见错误现象:Stage X skipped, 999 tasks succeeded, 1 task failed;日志里反复看到 Shuffle read size: 2.1 GB 这种远超平均值的记录;YARN UI 上某个 container 的 CPU/ 内存长期 100%。
- 先用
SELECT key, COUNT(*) FROM table GROUP BY key ORDER BY COUNT(*) DESC LIMIT 10找出高频 key - 检查是否含空值、默认值(如
'-1'、'unknown')、测试数据残留(如全填'test_user_001') - 别直接在原 SQL 里加
WHERE key IS NOT NULL就以为完事——如果业务上必须保留这些 key,过滤只会让结果不全
用 salt + map join 拆解倾斜 key
核心思路:把大 key“打散”,让它们不再挤在同一个 reducer。不是改逻辑,是改分布方式。
适用场景:倾斜 key 占比不高(比如前 5 个 key 占总行数
- 对左表,给倾斜 key 随机加盐:
IF(key IN ('0', '-1'), CONCAT(key, '_', FLOOR(RAND() * 10)), key) - 对右表,对应地“复制”倾斜 key:对每个
'0',生成 10 行,key 分别为'0_0','0_1', …,'0_9' - 两表都按新 key join 后,再用
GROUP BY去重聚合(如SUM(value)要除以 salt 数量) - Hive 中可直接用
/*+ MAPJOIN(small_table) */提前广播小表,避免大表倾斜 key 再 shuffle
skewjoin 开关不是万能的
Hive 的 hive.optimize.skewjoin 确实能自动检测并拆分倾斜 key,但它只对 MapJoin 场景生效,且依赖准确的统计信息(ANALYZE TABLE 必须提前跑过)。
容易踩的坑:
- 没开
hive.cbo.enable=true和hive.compute.query.using.stats=true,skewjoin可能压根不触发 - 统计信息过期(比如表刚 insert overwrite 但没 analyze),系统误判“不倾斜”,跳过优化
- 开启后反而变慢:因为自动拆分引入额外 map task 和中间文件,小数据量时得不偿失
- Spark SQL 没等价配置——别指望
spark.sql.adaptive.enabled=true自动解决 JOIN 倾斜,它主要优化 stage 切分和 join 策略选择
分区字段 ≠ JOIN 字段,别硬凑
有人发现表按 dt 分区很快,就想把 JOIN 也改成 ON a.dt = b.dt AND a.id = b.id 来“利用分区”。这是典型误解:分区裁剪只发生在 scan 阶段,对 shuffle 阶段的 key 分布毫无影响。
真正有效的负载均衡,得从 key 本身入手:
- 如果业务允许,把单一大 key 拆成逻辑组(比如按
user_id % 10分桶,再连带分桶字段一起 join) - 用
bucketed table配合CLUSTERED BY (key) INTO N BUCKETS,确保相同 key 总落在同 bucket,便于后续 map join 或 bucket map join - 警惕
CAST或函数包裹 key(如ON MD5(a.id) = MD5(b.id)):这会让原本相同的 key 散开,看似“均衡”,实则破坏关联语义,结果错得离谱
最麻烦的情况是:倾斜 key 就是业务核心(比如平台运营账号、超级大 V),既不能过滤,又无法加盐。这时候得回到数据源头看能不能预聚合,或者接受用 broadcast hint 强制小表广播——但务必确认小表真小,否则 driver OOM 就在下一行日志里等着。